diff --git a/catalogs/catalog-messaging-kafka/build.gradle.kts b/catalogs/catalog-messaging-kafka/build.gradle.kts index a992063ee09..0757fec6bac 100644 --- a/catalogs/catalog-messaging-kafka/build.gradle.kts +++ b/catalogs/catalog-messaging-kafka/build.gradle.kts @@ -16,10 +16,14 @@ dependencies { implementation(project(":common")) implementation(libs.guava) + implementation(libs.kafka.clients) + implementation(libs.slf4j.api) + testImplementation(libs.commons.io) + testImplementation(libs.curator.test) testImplementation(libs.junit.jupiter.api) + testImplementation(libs.kafka) testImplementation(libs.mockito.core) - testImplementation(libs.commons.io) testRuntimeOnly(libs.junit.jupiter.engine) } diff --git a/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java b/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java index 39ec5eb108c..24d36598796 100644 --- a/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java +++ b/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaCatalogOperations.java @@ -6,6 +6,8 @@ import static com.datastrato.gravitino.StringIdentifier.ID_KEY; import static com.datastrato.gravitino.catalog.kafka.KafkaCatalogPropertiesMetadata.BOOTSTRAP_SERVERS; +import static com.datastrato.gravitino.catalog.kafka.KafkaTopicPropertiesMetadata.PARTITION_COUNT; +import static com.datastrato.gravitino.catalog.kafka.KafkaTopicPropertiesMetadata.REPLICATION_FACTOR; import static com.datastrato.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX; import com.datastrato.gravitino.Entity; @@ -35,31 +37,60 @@ import com.datastrato.gravitino.rel.SchemaChange; import com.datastrato.gravitino.rel.SupportsSchemas; import com.datastrato.gravitino.storage.IdGenerator; +import com.datastrato.gravitino.utils.PrincipalUtils; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import java.io.IOException; import java.time.Instant; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.DescribeConfigsResult; +import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class KafkaCatalogOperations implements CatalogOperations, SupportsSchemas, TopicCatalog { + private static final Logger LOG = LoggerFactory.getLogger(KafkaCatalogOperations.class); private static final KafkaCatalogPropertiesMetadata CATALOG_PROPERTIES_METADATA = new KafkaCatalogPropertiesMetadata(); private static final KafkaSchemaPropertiesMetadata SCHEMA_PROPERTIES_METADATA = new KafkaSchemaPropertiesMetadata(); private static final KafkaTopicPropertiesMetadata TOPIC_PROPERTIES_METADATA = new KafkaTopicPropertiesMetadata(); + private static final String DEFAULT_SCHEMA_NAME = "default"; + @VisibleForTesting static final String CLIENT_ID_TEMPLATE = "%s-%s.%s"; private final EntityStore store; private final IdGenerator idGenerator; - private final String DEFAULT_SCHEMA_NAME = "default"; @VisibleForTesting NameIdentifier defaultSchemaIdent; @VisibleForTesting Properties adminClientConfig; private CatalogInfo info; + private AdminClient adminClient; @VisibleForTesting KafkaCatalogOperations(EntityStore store, IdGenerator idGenerator) { @@ -92,39 +123,196 @@ public void initialize(Map config, CatalogInfo info) throws Runt e -> e.getKey().substring(CATALOG_BYPASS_PREFIX.length()), Map.Entry::getValue)); adminClientConfig.putAll(bypassConfigs); - adminClientConfig.put(BOOTSTRAP_SERVERS, config.get(BOOTSTRAP_SERVERS)); + adminClientConfig.put( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.get(BOOTSTRAP_SERVERS)); // use gravitino catalog id as the admin client id - adminClientConfig.put("client.id", config.get(ID_KEY)); + adminClientConfig.put( + AdminClientConfig.CLIENT_ID_CONFIG, + String.format(CLIENT_ID_TEMPLATE, config.get(ID_KEY), info.namespace(), info.name())); createDefaultSchema(); + adminClient = AdminClient.create(adminClientConfig); } @Override public NameIdentifier[] listTopics(Namespace namespace) throws NoSuchSchemaException { - throw new UnsupportedOperationException(); + NameIdentifier schemaIdent = NameIdentifier.of(namespace.levels()); + checkSchemaExists(schemaIdent); + + try { + ListTopicsResult result = adminClient.listTopics(); + Set topicNames = result.names().get(); + return topicNames.stream() + .map(name -> NameIdentifier.of(namespace, name)) + .toArray(NameIdentifier[]::new); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Failed to list topics under the schema " + namespace, e); + } } @Override public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException { - throw new UnsupportedOperationException(); + NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels()); + checkSchemaExists(schemaIdent); + + DescribeTopicsResult result = adminClient.describeTopics(Collections.singleton(ident.name())); + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, ident.name()); + DescribeConfigsResult configsResult = + adminClient.describeConfigs(Collections.singleton(configResource)); + int partitions; + int replicationFactor; + Map properties = Maps.newHashMap(); + try { + TopicDescription topicDescription = result.topicNameValues().get(ident.name()).get(); + partitions = topicDescription.partitions().size(); + replicationFactor = topicDescription.partitions().get(0).replicas().size(); + + Config topicConfigs = configsResult.all().get().get(configResource); + topicConfigs.entries().forEach(e -> properties.put(e.name(), e.value())); + properties.put(PARTITION_COUNT, String.valueOf(partitions)); + properties.put(REPLICATION_FACTOR, String.valueOf(replicationFactor)); + } catch (ExecutionException e) { + if (e.getCause() instanceof UnknownTopicOrPartitionException) { + throw new NoSuchTopicException(e, "Topic %s does not exist", ident); + } else { + throw new RuntimeException("Failed to load topic " + ident.name() + " from Kafka", e); + } + } catch (InterruptedException e) { + throw new RuntimeException("Failed to load topic " + ident.name() + " from Kafka", e); + } + + LOG.info("Loaded topic {} from Kafka", ident); + + return KafkaTopic.builder() + .withName(ident.name()) + .withProperties(properties) + .withAuditInfo( + AuditInfo.builder() + .withCreator(PrincipalUtils.getCurrentPrincipal().getName()) + .withCreateTime(Instant.now()) + .build()) + .build(); } @Override public Topic createTopic( NameIdentifier ident, String comment, DataLayout dataLayout, Map properties) throws NoSuchSchemaException, TopicAlreadyExistsException { - throw new UnsupportedOperationException(); + NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels()); + checkSchemaExists(schemaIdent); + + try { + CreateTopicsResult createTopicsResult = + adminClient.createTopics(Collections.singleton(buildNewTopic(ident, properties))); + LOG.info( + "Created topic {} with {} partitions and replication factor {}", + ident, + createTopicsResult.numPartitions(ident.name()).get(), + createTopicsResult.replicationFactor(ident.name()).get()); + } catch (ExecutionException e) { + if (e.getCause() instanceof TopicExistsException) { + throw new TopicAlreadyExistsException(e, "Topic %s already exists", ident); + + } else if (e.getCause() instanceof InvalidReplicationFactorException) { + throw new IllegalArgumentException( + "Invalid replication factor for topic " + ident + e.getCause().getMessage(), e); + + } else if (e.getCause() instanceof InvalidConfigurationException) { + throw new IllegalArgumentException( + "Invalid properties for topic " + ident + e.getCause().getMessage(), e); + + } else { + throw new RuntimeException("Failed to create topic in Kafka" + ident, e); + } + } catch (InterruptedException e) { + throw new RuntimeException("Failed to create topic in Kafka" + ident, e); + } + + return KafkaTopic.builder() + .withName(ident.name()) + .withComment(comment) + .withProperties(properties) + .withAuditInfo( + AuditInfo.builder() + .withCreator(PrincipalUtils.getCurrentPrincipal().getName()) + .withCreateTime(Instant.now()) + .build()) + .build(); } @Override public Topic alterTopic(NameIdentifier ident, TopicChange... changes) throws NoSuchTopicException, IllegalArgumentException { - throw new UnsupportedOperationException(); + NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels()); + checkSchemaExists(schemaIdent); + + KafkaTopic topic = (KafkaTopic) loadTopic(ident); + String newComment = topic.comment(); + int oldPartitionCount = Integer.parseInt(topic.properties().get(PARTITION_COUNT)); + int newPartitionCount = oldPartitionCount; + Map alteredProperties = Maps.newHashMap(topic.properties()); + List alterConfigOps = Lists.newArrayList(); + for (TopicChange change : changes) { + if (change instanceof TopicChange.UpdateTopicComment) { + newComment = ((TopicChange.UpdateTopicComment) change).getNewComment(); + + } else if (change instanceof TopicChange.SetProperty) { + TopicChange.SetProperty setProperty = (TopicChange.SetProperty) change; + if (PARTITION_COUNT.equals(setProperty.getProperty())) { + // alter partition count + newPartitionCount = setPartitionCount(setProperty, newPartitionCount, alteredProperties); + } else { + // alter other properties + setProperty(setProperty, alteredProperties, alterConfigOps); + } + + } else if (change instanceof TopicChange.RemoveProperty) { + removeProperty((TopicChange.RemoveProperty) change, alteredProperties, alterConfigOps); + + } else { + throw new IllegalArgumentException("Unsupported topic change: " + change); + } + } + + if (newPartitionCount != oldPartitionCount) { + doPartitionCountIncrement(ident.name(), newPartitionCount); + } + + if (!alterConfigOps.isEmpty()) { + doAlterTopicConfig(ident.name(), alterConfigOps); + } + + return KafkaTopic.builder() + .withName(ident.name()) + .withComment(newComment) + .withProperties(alteredProperties) + .withAuditInfo( + AuditInfo.builder() + .withCreator(topic.auditInfo().creator()) + .withCreateTime(topic.auditInfo().createTime()) + .withLastModifier(PrincipalUtils.getCurrentPrincipal().getName()) + .withLastModifiedTime(Instant.now()) + .build()) + .build(); } @Override public boolean dropTopic(NameIdentifier ident) throws NoSuchTopicException { - throw new UnsupportedOperationException(); + NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels()); + checkSchemaExists(schemaIdent); + + try { + adminClient.deleteTopics(Collections.singleton(ident.name())).all().get(); + return true; + } catch (ExecutionException e) { + if (e.getCause() instanceof UnknownTopicOrPartitionException) { + return false; + } else { + throw new RuntimeException("Failed to drop topic " + ident.name() + " from Kafka", e); + } + } catch (InterruptedException e) { + throw new RuntimeException("Failed to drop topic " + ident.name() + " from Kafka", e); + } } @Override @@ -205,7 +393,12 @@ public PropertiesMetadata topicPropertiesMetadata() throws UnsupportedOperationE } @Override - public void close() throws IOException {} + public void close() throws IOException { + if (adminClient != null) { + adminClient.close(); + adminClient = null; + } + } @Override public PropertiesMetadata filesetPropertiesMetadata() throws UnsupportedOperationException { @@ -217,6 +410,116 @@ public PropertiesMetadata tablePropertiesMetadata() throws UnsupportedOperationE throw new UnsupportedOperationException("Kafka catalog does not support table operations"); } + /** + * Make sure the schema exists, otherwise throw an exception. + * + * @param ident The schema identifier. + * @throws NoSuchSchemaException If the schema does not exist. + */ + private void checkSchemaExists(NameIdentifier ident) throws NoSuchSchemaException { + if (!schemaExists(ident)) { + LOG.warn("Kafka catalog schema {} does not exist", ident); + throw new NoSuchSchemaException("Schema %s does not exist", ident); + } + } + + /** + * Set the new partition count for the topic if it is greater than the current partition count. + * + * @param setProperty The property change to set the partition count. + * @param currentPartitionCount The current partition count. + * @param properties The properties map to update. + * @return The new partition count. + */ + private int setPartitionCount( + TopicChange.SetProperty setProperty, + int currentPartitionCount, + Map properties) { + Preconditions.checkArgument( + PARTITION_COUNT.equals(setProperty.getProperty()), "Invalid property: %s", setProperty); + + int targetPartitionCount = Integer.parseInt(setProperty.getValue()); + if (targetPartitionCount == currentPartitionCount) { + return currentPartitionCount; + } else if (targetPartitionCount < currentPartitionCount) { + throw new IllegalArgumentException( + "Cannot reduce partition count from " + + currentPartitionCount + + " to " + + targetPartitionCount); + } else { + properties.put(PARTITION_COUNT, setProperty.getValue()); + return targetPartitionCount; + } + } + + private void setProperty( + TopicChange.SetProperty setProperty, + Map alteredProperties, + List alterConfigOps) { + alteredProperties.put(setProperty.getProperty(), setProperty.getValue()); + alterConfigOps.add( + new AlterConfigOp( + new ConfigEntry(setProperty.getProperty(), setProperty.getValue()), + AlterConfigOp.OpType.SET)); + } + + private void removeProperty( + TopicChange.RemoveProperty removeProperty, + Map alteredProperties, + List alterConfigOps) { + Preconditions.checkArgument( + !PARTITION_COUNT.equals(removeProperty.getProperty()), "Cannot remove partition count"); + alteredProperties.remove(removeProperty.getProperty()); + alterConfigOps.add( + new AlterConfigOp( + new ConfigEntry(removeProperty.getProperty(), null), AlterConfigOp.OpType.DELETE)); + } + + private void doPartitionCountIncrement(String topicName, int newPartitionCount) { + try { + adminClient + .createPartitions( + Collections.singletonMap(topicName, NewPartitions.increaseTo(newPartitionCount))) + .all() + .get(); + } catch (Exception e) { + throw new RuntimeException("Failed to increase partition count for topic " + topicName, e); + } + } + + private void doAlterTopicConfig(String topicName, List alterConfigOps) { + ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName); + try { + adminClient + .incrementalAlterConfigs(Collections.singletonMap(topicResource, alterConfigOps)) + .all() + .get(); + } catch (UnknownTopicOrPartitionException e) { + throw new NoSuchTopicException(e, "Topic %s does not exist", topicName); + } catch (Exception e) { + throw new RuntimeException("Failed to alter topic properties for topic " + topicName, e); + } + } + + private NewTopic buildNewTopic(NameIdentifier ident, Map properties) { + Optional partitionCount = + Optional.ofNullable( + (int) TOPIC_PROPERTIES_METADATA.getOrDefault(properties, PARTITION_COUNT)); + Optional replicationFactor = + Optional.ofNullable( + (short) TOPIC_PROPERTIES_METADATA.getOrDefault(properties, REPLICATION_FACTOR)); + NewTopic newTopic = new NewTopic(ident.name(), partitionCount, replicationFactor); + return newTopic.configs(buildNewTopicConfigs(properties)); + } + + private Map buildNewTopicConfigs(Map properties) { + Map topicConfigs = Maps.newHashMap(properties); + topicConfigs.remove(PARTITION_COUNT); + topicConfigs.remove(REPLICATION_FACTOR); + return topicConfigs; + } + private void createDefaultSchema() { // If the default schema already exists, do nothing try { diff --git a/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaTopic.java b/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaTopic.java new file mode 100644 index 00000000000..2e43ccf9427 --- /dev/null +++ b/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaTopic.java @@ -0,0 +1,42 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.kafka; + +import static com.datastrato.gravitino.catalog.kafka.KafkaTopicPropertiesMetadata.PARTITION_COUNT; + +import com.datastrato.gravitino.connector.BaseTopic; +import java.util.Optional; +import org.apache.kafka.clients.admin.NewTopic; + +public class KafkaTopic extends BaseTopic { + + public NewTopic toKafkaTopic(KafkaTopicPropertiesMetadata propertiesMetadata) { + Optional partitionCount = + Optional.ofNullable((int) propertiesMetadata.getOrDefault(properties(), PARTITION_COUNT)); + Optional replicationFactor = + Optional.ofNullable( + (short) + propertiesMetadata.getOrDefault( + properties(), KafkaTopicPropertiesMetadata.REPLICATION_FACTOR)); + return new NewTopic(name, partitionCount, replicationFactor); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder extends BaseTopicBuilder { + + @Override + protected KafkaTopic internalBuild() { + KafkaTopic topic = new KafkaTopic(); + topic.name = name; + topic.comment = comment; + topic.properties = properties; + topic.auditInfo = auditInfo; + return topic; + } + } +} diff --git a/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaTopicPropertiesMetadata.java b/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaTopicPropertiesMetadata.java index 111e326f8e0..d6a6bfe405e 100644 --- a/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaTopicPropertiesMetadata.java +++ b/catalogs/catalog-messaging-kafka/src/main/java/com/datastrato/gravitino/catalog/kafka/KafkaTopicPropertiesMetadata.java @@ -6,12 +6,41 @@ import com.datastrato.gravitino.connector.BasePropertiesMetadata; import com.datastrato.gravitino.connector.PropertyEntry; -import java.util.Collections; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import java.util.List; import java.util.Map; public class KafkaTopicPropertiesMetadata extends BasePropertiesMetadata { + public static final String PARTITION_COUNT = "partition-count"; + public static final String REPLICATION_FACTOR = "replication-factor"; + + private static final Map> PROPERTIES_METADATA; + + static { + List> propertyEntries = + ImmutableList.of( + PropertyEntry.integerOptionalPropertyEntry( + PARTITION_COUNT, + "The number of partitions for the topic, if not specified, " + + "will use the num.partition property in the broker", + false /* immutable */, + null /* default value */, + false /* hidden */), + // TODO: make REPLICATION_FACTOR mutable if needed + PropertyEntry.shortOptionalPropertyEntry( + REPLICATION_FACTOR, + "The number of replications for the topic, if not specified, " + + "will use the default.replication.factor property in the broker", + true /* immutable */, + null /* default value */, + false /* hidden */)); + + PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, PropertyEntry::getName); + } + @Override protected Map> specificPropertyEntries() { - return Collections.emptyMap(); + return PROPERTIES_METADATA; } } diff --git a/catalogs/catalog-messaging-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/TestKafkaCatalogOperations.java b/catalogs/catalog-messaging-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/TestKafkaCatalogOperations.java index 88f68710655..9dacdd1625d 100644 --- a/catalogs/catalog-messaging-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/TestKafkaCatalogOperations.java +++ b/catalogs/catalog-messaging-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/TestKafkaCatalogOperations.java @@ -12,7 +12,10 @@ import static com.datastrato.gravitino.Configs.KV_DELETE_AFTER_TIME; import static com.datastrato.gravitino.Configs.STORE_TRANSACTION_MAX_SKEW_TIME; import static com.datastrato.gravitino.StringIdentifier.ID_KEY; +import static com.datastrato.gravitino.catalog.kafka.KafkaCatalogOperations.CLIENT_ID_TEMPLATE; import static com.datastrato.gravitino.catalog.kafka.KafkaCatalogPropertiesMetadata.BOOTSTRAP_SERVERS; +import static com.datastrato.gravitino.catalog.kafka.KafkaTopicPropertiesMetadata.PARTITION_COUNT; +import static com.datastrato.gravitino.catalog.kafka.KafkaTopicPropertiesMetadata.REPLICATION_FACTOR; import com.datastrato.gravitino.Config; import com.datastrato.gravitino.Configs; @@ -21,7 +24,13 @@ import com.datastrato.gravitino.EntityStoreFactory; import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.catalog.kafka.embeddedKafka.KafkaClusterEmbedded; import com.datastrato.gravitino.connector.BasePropertiesMetadata; +import com.datastrato.gravitino.exceptions.NoSuchSchemaException; +import com.datastrato.gravitino.exceptions.NoSuchTopicException; +import com.datastrato.gravitino.exceptions.TopicAlreadyExistsException; +import com.datastrato.gravitino.messaging.Topic; +import com.datastrato.gravitino.messaging.TopicChange; import com.datastrato.gravitino.meta.AuditInfo; import com.datastrato.gravitino.meta.CatalogEntity; import com.datastrato.gravitino.rel.Schema; @@ -32,23 +41,23 @@ import java.io.IOException; import java.time.Instant; import java.util.Map; -import java.util.UUID; import org.apache.commons.io.FileUtils; +import org.apache.kafka.common.config.TopicConfig; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.mockito.Mockito; -public class TestKafkaCatalogOperations { +public class TestKafkaCatalogOperations extends KafkaClusterEmbedded { private static final String ROCKS_DB_STORE_PATH = - "/tmp/gravitino_test_entityStore_" + UUID.randomUUID().toString().replace("-", ""); + "/tmp/gravitino_test_entityStore_" + genRandomString(); private static final String METALAKE_NAME = "metalake"; private static final String CATALOG_NAME = "test_kafka_catalog"; + private static final String DEFAULT_SCHEMA_NAME = "default"; private static final Map MOCK_CATALOG_PROPERTIES = - ImmutableMap.of( - BOOTSTRAP_SERVERS, "localhost:9092", ID_KEY, "gravitino.v1.uid33220758755757000"); + ImmutableMap.of(BOOTSTRAP_SERVERS, brokerList(), ID_KEY, "gravitino.v1.uid33220758755757000"); private static EntityStore store; private static IdGenerator idGenerator; private static CatalogEntity kafkaCatalogEntity; @@ -90,8 +99,10 @@ public static void setUp() { @AfterAll public static void tearDown() throws IOException { - store.close(); - FileUtils.deleteDirectory(FileUtils.getFile(ROCKS_DB_STORE_PATH)); + if (store != null) { + store.close(); + FileUtils.deleteDirectory(FileUtils.getFile(ROCKS_DB_STORE_PATH)); + } } @Test @@ -120,7 +131,12 @@ public void testKafkaCatalogConfiguration() { MOCK_CATALOG_PROPERTIES.get(BOOTSTRAP_SERVERS), ops.adminClientConfig.get(BOOTSTRAP_SERVERS)); Assertions.assertEquals( - MOCK_CATALOG_PROPERTIES.get(ID_KEY), ops.adminClientConfig.get("client.id")); + String.format( + CLIENT_ID_TEMPLATE, + MOCK_CATALOG_PROPERTIES.get(ID_KEY), + catalogEntity.namespace(), + catalogName), + ops.adminClientConfig.get("client.id")); } @Test @@ -143,13 +159,13 @@ public void testInitialization() { ops.initialize(MOCK_CATALOG_PROPERTIES, catalogEntity.toCatalogInfo()); Assertions.assertNotNull(ops.defaultSchemaIdent); - Assertions.assertEquals("default", ops.defaultSchemaIdent.name()); + Assertions.assertEquals(DEFAULT_SCHEMA_NAME, ops.defaultSchemaIdent.name()); Assertions.assertEquals( METALAKE_NAME + "." + catalogName, ops.defaultSchemaIdent.namespace().toString()); Assertions.assertTrue(ops.schemaExists(ops.defaultSchemaIdent)); Schema schema = ops.loadSchema(ops.defaultSchemaIdent); - Assertions.assertEquals("default", schema.name()); + Assertions.assertEquals(DEFAULT_SCHEMA_NAME, schema.name()); } @Test @@ -167,10 +183,10 @@ public void testCreateSchema() { @Test public void testLoadSchema() { - NameIdentifier ident = NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, "default"); + NameIdentifier ident = NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME); Schema schema = kafkaCatalogOperations.loadSchema(ident); - Assertions.assertEquals("default", schema.name()); + Assertions.assertEquals(DEFAULT_SCHEMA_NAME, schema.name()); Assertions.assertEquals( "The default schema of Kafka catalog including all topics", schema.comment()); Assertions.assertEquals(2, schema.properties().size()); @@ -186,7 +202,7 @@ public void testAlterSchema() { IllegalArgumentException.class, () -> kafkaCatalogOperations.alterSchema( - NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, "default"), + NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME), SchemaChange.removeProperty("key1"))); Assertions.assertEquals("Cannot alter the default schema", exception.getMessage()); @@ -208,7 +224,7 @@ public void testDropSchema() { IllegalArgumentException.class, () -> kafkaCatalogOperations.dropSchema( - NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, "default"), true)); + NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME), true)); Assertions.assertEquals("Cannot drop the default schema", exception.getMessage()); NameIdentifier ident = NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, "test_schema"); @@ -219,4 +235,210 @@ public void testDropSchema() { Assertions.assertEquals( "Kafka catalog does not support schema deletion", exception.getMessage()); } + + @Test + public void testCreateTopic() { + NameIdentifier ident = + NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, "test_create_topic"); + String comment = "test comment"; + Map properties = + ImmutableMap.of( + PARTITION_COUNT, + "3", + REPLICATION_FACTOR, + "1", + TopicConfig.COMPRESSION_TYPE_CONFIG, + "producer"); + Topic createdTopic = kafkaCatalogOperations.createTopic(ident, comment, null, properties); + Assertions.assertNotNull(createdTopic); + Assertions.assertEquals(ident.name(), createdTopic.name()); + Assertions.assertEquals("3", createdTopic.properties().get(PARTITION_COUNT)); + Assertions.assertEquals("1", createdTopic.properties().get(REPLICATION_FACTOR)); + Assertions.assertEquals( + "producer", createdTopic.properties().get(TopicConfig.COMPRESSION_TYPE_CONFIG)); + } + + @Test + public void testCreateTopicException() { + Map properties = ImmutableMap.of(PARTITION_COUNT, "3", REPLICATION_FACTOR, "1"); + + // test topic already exists + Exception exception = + Assertions.assertThrows( + TopicAlreadyExistsException.class, + () -> + kafkaCatalogOperations.createTopic( + NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, TOPIC_1), + null, + null, + properties)); + Assertions.assertEquals( + "Topic metalake.test_kafka_catalog.default.kafka-test-topic-1 already exists", + exception.getMessage()); + + // test schema not exists + exception = + Assertions.assertThrows( + NoSuchSchemaException.class, + () -> + kafkaCatalogOperations.createTopic( + NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, "test_schema", "error_topic"), + null, + null, + properties)); + Assertions.assertEquals( + "Schema metalake.test_kafka_catalog.test_schema does not exist", exception.getMessage()); + + Map wrongProperties = + ImmutableMap.of(PARTITION_COUNT, "3", REPLICATION_FACTOR, "3"); + exception = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + kafkaCatalogOperations.createTopic( + NameIdentifier.of( + METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, "error_topic"), + null, + null, + wrongProperties)); + Assertions.assertTrue( + exception.getMessage().contains("Invalid replication factor for topic"), + exception.getMessage()); + } + + @Test + public void testLoadTopic() { + Topic topic = + kafkaCatalogOperations.loadTopic( + NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, TOPIC_1)); + Assertions.assertNotNull(topic); + Assertions.assertEquals(TOPIC_1, topic.name()); + Assertions.assertEquals("1", topic.properties().get(PARTITION_COUNT)); + Assertions.assertEquals("1", topic.properties().get(REPLICATION_FACTOR)); + Assertions.assertTrue(topic.properties().size() > 2); + } + + @Test + public void testLoadTopicException() { + Exception exception = + Assertions.assertThrows( + NoSuchTopicException.class, + () -> + kafkaCatalogOperations.loadTopic( + NameIdentifier.of( + METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, "error_topic"))); + Assertions.assertEquals( + "Topic metalake.test_kafka_catalog.default.error_topic does not exist", + exception.getMessage()); + } + + @Test + public void testListTopics() { + NameIdentifier[] topics = + kafkaCatalogOperations.listTopics( + Namespace.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME)); + Assertions.assertTrue(topics.length > 0); + + Exception exception = + Assertions.assertThrows( + NoSuchSchemaException.class, + () -> + kafkaCatalogOperations.listTopics( + Namespace.of(METALAKE_NAME, CATALOG_NAME, "error_schema"))); + Assertions.assertEquals( + "Schema metalake.test_kafka_catalog.error_schema does not exist", exception.getMessage()); + } + + @Test + public void testDropTopic() { + NameIdentifier ident = + NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, "test_drop_topic"); + Map properties = ImmutableMap.of(PARTITION_COUNT, "3", REPLICATION_FACTOR, "1"); + kafkaCatalogOperations.createTopic(ident, null, null, properties); + Assertions.assertNotNull(kafkaCatalogOperations.loadTopic(ident)); + + Assertions.assertTrue(kafkaCatalogOperations.dropTopic(ident)); + Exception exception = + Assertions.assertThrows( + NoSuchTopicException.class, () -> kafkaCatalogOperations.loadTopic(ident)); + Assertions.assertEquals( + "Topic metalake.test_kafka_catalog.default.test_drop_topic does not exist", + exception.getMessage()); + + Assertions.assertFalse(kafkaCatalogOperations.dropTopic(ident)); + } + + @Test + public void testAlterTopic() { + NameIdentifier ident = + NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, "test_alter_topic"); + Map properties = + ImmutableMap.of( + PARTITION_COUNT, + "2", + REPLICATION_FACTOR, + "1", + TopicConfig.COMPRESSION_TYPE_CONFIG, + "gzip", + TopicConfig.RETENTION_MS_CONFIG, + "43200000"); + Topic createdTopic = kafkaCatalogOperations.createTopic(ident, null, null, properties); + + Topic alteredTopic = + kafkaCatalogOperations.alterTopic( + ident, + TopicChange.updateComment("new comment"), + TopicChange.setProperty(PARTITION_COUNT, "3"), + TopicChange.setProperty(TopicConfig.COMPRESSION_TYPE_CONFIG, "producer"), + TopicChange.removeProperty(TopicConfig.RETENTION_MS_CONFIG)); + Assertions.assertEquals(createdTopic.name(), alteredTopic.name()); + Assertions.assertEquals("new comment", alteredTopic.comment()); + Assertions.assertEquals("3", alteredTopic.properties().get(PARTITION_COUNT)); + Assertions.assertEquals("1", alteredTopic.properties().get(REPLICATION_FACTOR)); + Assertions.assertEquals( + "producer", alteredTopic.properties().get(TopicConfig.COMPRESSION_TYPE_CONFIG)); + Assertions.assertNull(alteredTopic.properties().get(TopicConfig.RETENTION_MS_CONFIG)); + + // reload topic and check if the changes are applied + alteredTopic = kafkaCatalogOperations.loadTopic(ident); + Assertions.assertEquals(createdTopic.name(), alteredTopic.name()); + // comment is null because it is not stored in the topic + Assertions.assertNull(alteredTopic.comment()); + Assertions.assertEquals("3", alteredTopic.properties().get(PARTITION_COUNT)); + Assertions.assertEquals("1", alteredTopic.properties().get(REPLICATION_FACTOR)); + Assertions.assertEquals( + "producer", alteredTopic.properties().get(TopicConfig.COMPRESSION_TYPE_CONFIG)); + // retention.ms overridden was removed, so it should be the default value + Assertions.assertEquals( + "604800000", alteredTopic.properties().get(TopicConfig.RETENTION_MS_CONFIG)); + + // test exception + Exception exception = + Assertions.assertThrows( + NoSuchTopicException.class, + () -> + kafkaCatalogOperations.alterTopic( + NameIdentifier.of( + METALAKE_NAME, CATALOG_NAME, DEFAULT_SCHEMA_NAME, "error_topic"), + TopicChange.updateComment("new comment"))); + Assertions.assertEquals( + "Topic metalake.test_kafka_catalog.default.error_topic does not exist", + exception.getMessage()); + + exception = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + kafkaCatalogOperations.alterTopic( + ident, TopicChange.removeProperty(PARTITION_COUNT))); + Assertions.assertEquals("Cannot remove partition count", exception.getMessage()); + + exception = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + kafkaCatalogOperations.alterTopic( + ident, TopicChange.setProperty(PARTITION_COUNT, "1"))); + Assertions.assertEquals("Cannot reduce partition count from 3 to 1", exception.getMessage()); + } } diff --git a/catalogs/catalog-messaging-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/embeddedKafka/KafkaClusterEmbedded.java b/catalogs/catalog-messaging-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/embeddedKafka/KafkaClusterEmbedded.java new file mode 100644 index 00000000000..4a4c98e18b0 --- /dev/null +++ b/catalogs/catalog-messaging-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/embeddedKafka/KafkaClusterEmbedded.java @@ -0,0 +1,95 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.kafka.embeddedKafka; + +import java.io.IOException; +import java.net.ServerSocket; +import java.util.Properties; +import java.util.UUID; +import kafka.server.KafkaConfig; +import kafka.server.KafkaConfig$; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class KafkaClusterEmbedded { + public static final String TOPIC_1 = "kafka-test-topic-1"; + public static final String TOPIC_2 = "kafka-test-topic-2"; + public static final String TOPIC_3 = "kafka-test-topic-3"; + + private static final Logger LOG = LoggerFactory.getLogger(KafkaClusterEmbedded.class); + private static ZooKeeperEmbedded zookeeper; + private static KafkaEmbedded broker; + + /** Creates and starts the cluster. */ + @BeforeAll + public static void start() throws Exception { + LOG.info("Initiating embedded Kafka cluster startup"); + LOG.info("Starting a ZooKeeper instance..."); + zookeeper = new ZooKeeperEmbedded(); + LOG.info("ZooKeeper instance is running at {}", zookeeper.connectString()); + + Properties brokerConfig = initBrokerConfig(); + LOG.info( + "Starting a Kafka instance on port {} ...", + brokerConfig.getProperty(KafkaConfig.ListenersProp())); + broker = new KafkaEmbedded(brokerConfig); + LOG.info( + "Kafka instance is running at {}, connected to ZooKeeper at {}", + broker.brokerList(), + broker.zookeeperConnect()); + + // Create initial topics + broker.createTopic(TOPIC_1); + broker.createTopic(TOPIC_2); + broker.createTopic(TOPIC_3); + } + + @AfterAll + public static void stop() throws IOException { + LOG.info("Stopping embedded Kafka cluster"); + if (broker != null) { + broker.stop(); + } + + if (zookeeper != null) { + zookeeper.stop(); + } + + LOG.info("Embedded Kafka cluster stopped"); + } + + protected static String genRandomString() { + return UUID.randomUUID().toString().replace("-", ""); + } + + public static String brokerList() { + return broker.brokerList(); + } + + private static Properties initBrokerConfig() { + Properties configs = new Properties(); + configs.put(KafkaConfig$.MODULE$.ZkConnectProp(), zookeeper.connectString()); + configs.put(KafkaConfig$.MODULE$.ZkSessionTimeoutMsProp(), 30 * 1000); + configs.put(KafkaConfig$.MODULE$.ZkConnectionTimeoutMsProp(), 60 * 1000); + configs.put(KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true); + configs.put(KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L); + configs.put(KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0); + configs.put(KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), (short) 1); + configs.put(KafkaConfig$.MODULE$.OffsetsTopicPartitionsProp(), 1); + configs.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true); + // Find a random port + try (ServerSocket socket = new ServerSocket(0)) { + socket.setReuseAddress(true); + configs.put( + KafkaConfig.ListenersProp(), + String.format("PLAINTEXT://127.0.0.1:%s", socket.getLocalPort())); + } catch (IOException e) { + throw new RuntimeException("Can't find a port to start embedded Kafka broker", e); + } + return configs; + } +} diff --git a/catalogs/catalog-messaging-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/embeddedKafka/KafkaEmbedded.java b/catalogs/catalog-messaging-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/embeddedKafka/KafkaEmbedded.java new file mode 100644 index 00000000000..f9d63abda9f --- /dev/null +++ b/catalogs/catalog-messaging-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/embeddedKafka/KafkaEmbedded.java @@ -0,0 +1,109 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.kafka.embeddedKafka; + +import static com.datastrato.gravitino.catalog.kafka.embeddedKafka.KafkaClusterEmbedded.genRandomString; + +import java.io.IOException; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import kafka.cluster.EndPoint; +import kafka.server.KafkaConfig; +import kafka.server.KafkaConfig$; +import kafka.server.KafkaServer; +import org.apache.commons.io.FileUtils; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +public class KafkaEmbedded { + private static final Logger LOG = LoggerFactory.getLogger(KafkaEmbedded.class); + private static final String LOG_DIR = "/tmp/gravitino_test_embeddedKafka_" + genRandomString(); + + private final Properties effectiveConfig; + private final KafkaServer kafka; + + public KafkaEmbedded(final Properties config) { + effectiveConfig = effectiveConfigFrom(config); + final boolean loggingEnabled = true; + + final KafkaConfig kafkaConfig = new KafkaConfig(effectiveConfig, loggingEnabled); + LOG.info("Starting embedded Kafka broker (with ZK ensemble at {}) ...", zookeeperConnect()); + kafka = new KafkaServer(kafkaConfig, Time.SYSTEM, Option.apply("embedded-kafka-broker"), false); + kafka.startup(); + LOG.info( + "Startup of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", + brokerList(), + zookeeperConnect()); + } + + public void createTopic(String topic) { + Properties properties = new Properties(); + properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList()); + + try (AdminClient adminClient = AdminClient.create(properties)) { + NewTopic newTopic = new NewTopic(topic, 1, (short) 1); + adminClient.createTopics(Collections.singletonList(newTopic)).all().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Failed to create topic " + topic, e); + } + } + + /** + * This broker's `metadata.broker.list` value. Example: `127.0.0.1:9092`. + * + *

You can use this to tell Kafka producers and consumers how to connect to this instance. + */ + public String brokerList() { + final EndPoint endPoint = kafka.advertisedListeners().head(); + final String hostname = endPoint.host() == null ? "" : endPoint.host(); + + return String.join( + ":", + hostname, + Integer.toString( + kafka.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)))); + } + + /** The ZooKeeper connection string aka `zookeeper.connect`. */ + public String zookeeperConnect() { + return effectiveConfig.getProperty("zookeeper.connect"); + } + + /** Stop the broker. */ + public void stop() throws IOException { + LOG.info( + "Shutting down embedded Kafka broker at {} (with ZK ensemble at {}) ...", + brokerList(), + zookeeperConnect()); + kafka.shutdown(); + kafka.awaitShutdown(); + FileUtils.deleteDirectory(FileUtils.getFile(LOG_DIR)); + LOG.info( + "Shutdown of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", + brokerList(), + zookeeperConnect()); + } + + private Properties effectiveConfigFrom(final Properties initialConfig) { + final Properties effectiveConfig = new Properties(); + effectiveConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), 0); + effectiveConfig.put(KafkaConfig$.MODULE$.NumPartitionsProp(), 1); + effectiveConfig.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true); + effectiveConfig.put(KafkaConfig$.MODULE$.MessageMaxBytesProp(), 1000000); + effectiveConfig.put(KafkaConfig$.MODULE$.ControlledShutdownEnableProp(), true); + + effectiveConfig.putAll(initialConfig); + effectiveConfig.setProperty(KafkaConfig$.MODULE$.LogDirProp(), LOG_DIR); + return effectiveConfig; + } +} diff --git a/catalogs/catalog-messaging-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/embeddedKafka/ZooKeeperEmbedded.java b/catalogs/catalog-messaging-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/embeddedKafka/ZooKeeperEmbedded.java new file mode 100644 index 00000000000..bab31e83f17 --- /dev/null +++ b/catalogs/catalog-messaging-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/embeddedKafka/ZooKeeperEmbedded.java @@ -0,0 +1,45 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.kafka.embeddedKafka; + +import java.io.IOException; +import org.apache.curator.test.TestingServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ZooKeeperEmbedded { + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperEmbedded.class); + private final TestingServer server; + + /** + * Creates and starts a ZooKeeper instance. + * + * @throws Exception if an error occurs during ZooKeeper startup + */ + public ZooKeeperEmbedded() throws Exception { + LOG.info("Starting embedded ZooKeeper server..."); + this.server = new TestingServer(); + LOG.info( + "Embedded ZooKeeper server at {} uses the temp directory at {}", + server.getConnectString(), + server.getTempDirectory()); + } + + public void stop() throws IOException { + LOG.info("Shutting down embedded ZooKeeper server at {} ...", server.getConnectString()); + server.close(); + LOG.info("Shutdown of embedded ZooKeeper server at {} completed", server.getConnectString()); + } + + /** + * The ZooKeeper connection string aka `zookeeper.connect` in `hostnameOrIp:port` format. Example: + * `127.0.0.1:2181`. + * + *

You can use this to e.g. tell Kafka brokers how to connect to this instance. + */ + public String connectString() { + return server.getConnectString(); + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/connector/BaseTopic.java b/core/src/main/java/com/datastrato/gravitino/connector/BaseTopic.java new file mode 100644 index 00000000000..f68fc233e45 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/connector/BaseTopic.java @@ -0,0 +1,156 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.connector; + +import com.datastrato.gravitino.Audit; +import com.datastrato.gravitino.annotation.Evolving; +import com.datastrato.gravitino.messaging.Topic; +import com.datastrato.gravitino.meta.AuditInfo; +import java.util.Map; +import javax.annotation.Nullable; + +/** An abstract class representing a base topic in a messaging system. */ +@Evolving +public abstract class BaseTopic implements Topic { + + protected String name; + + @Nullable protected String comment; + + @Nullable protected Map properties; + + protected Audit auditInfo; + + /** @return The name of the topic. */ + @Override + public String name() { + return name; + } + + /** @return The comment or description for the topic. */ + @Nullable + @Override + public String comment() { + return comment; + } + + /** @return The associated properties of the topic. */ + @Override + public Map properties() { + return properties; + } + + /** @return The audit information for the topic. */ + @Override + public Audit auditInfo() { + return auditInfo; + } + + /** + * Builder interface for {@link BaseTopic}. + * + * @param The type of the builder. + * @param The type of the topic being built. + */ + interface Builder, T extends BaseTopic> { + + SELF withName(String name); + + SELF withComment(String comment); + + SELF withProperties(Map properties); + + SELF withAuditInfo(AuditInfo auditInfo); + + T build(); + } + + /** + * An abstract class implementing the builder interface for {@link BaseTopic}. This class should + * be extended by the concrete topic builders. + * + * @param The type of the builder. + * @param The type of the topic being built. + */ + public abstract static class BaseTopicBuilder< + SELF extends BaseTopicBuilder, T extends BaseTopic> + implements Builder { + protected String name; + protected String comment; + protected Map properties; + protected AuditInfo auditInfo; + + /** + * Sets the name of the topic. + * + * @param name The name of the topic. + * @return The builder instance. + */ + @Override + public SELF withName(String name) { + this.name = name; + return self(); + } + + /** + * Sets the comment of the topic. + * + * @param comment The comment or description for the topic. + * @return The builder instance. + */ + @Override + public SELF withComment(String comment) { + this.comment = comment; + return self(); + } + + /** + * Sets the associated properties of the topic. + * + * @param properties The associated properties of the topic. + * @return The builder instance. + */ + @Override + public SELF withProperties(Map properties) { + this.properties = properties; + return self(); + } + + /** + * Sets the audit information for the topic. + * + * @param auditInfo The audit information for the topic. + * @return The builder instance. + */ + @Override + public SELF withAuditInfo(AuditInfo auditInfo) { + this.auditInfo = auditInfo; + return self(); + } + + /** + * Builds the topic with the provided attributes. + * + * @return The built topic instance. + */ + @Override + public T build() { + T t = internalBuild(); + return t; + } + + private SELF self() { + return (SELF) this; + } + + /** + * Builds the concrete instance of the topic with the provided attributes. + * + * @return The concrete instance of the topic. + */ + @Evolving + protected abstract T internalBuild(); + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/connector/PropertyEntry.java b/core/src/main/java/com/datastrato/gravitino/connector/PropertyEntry.java index e67ca450be5..577ecd481cb 100644 --- a/core/src/main/java/com/datastrato/gravitino/connector/PropertyEntry.java +++ b/core/src/main/java/com/datastrato/gravitino/connector/PropertyEntry.java @@ -222,6 +222,28 @@ public static PropertyEntry integerPropertyEntry( .build(); } + public static PropertyEntry shortPropertyEntry( + String name, + String description, + boolean required, + boolean immutable, + Short defaultValue, + boolean hidden, + boolean reserved) { + return new Builder() + .withName(name) + .withDescription(description) + .withRequired(required) + .withImmutable(immutable) + .withJavaType(Short.class) + .withDefaultValue(defaultValue) + .withDecoder(Short::parseShort) + .withEncoder(String::valueOf) + .withHidden(hidden) + .withReserved(reserved) + .build(); + } + public static PropertyEntry stringReservedPropertyEntry( String name, String description, boolean hidden) { return stringPropertyEntry(name, description, false, true, null, hidden, true); @@ -264,6 +286,11 @@ public static PropertyEntry stringOptionalPropertyEntry( return stringPropertyEntry(name, description, false, immutable, defaultValue, hidden, false); } + public static PropertyEntry shortOptionalPropertyEntry( + String name, String description, boolean immutable, Short defaultValue, boolean hidden) { + return shortPropertyEntry(name, description, false, immutable, defaultValue, hidden, false); + } + public static PropertyEntry integerOptionalPropertyEntry( String name, String description, boolean immutable, Integer defaultValue, boolean hidden) { return integerPropertyEntry(name, description, false, immutable, defaultValue, hidden, false); diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index acc44b37f8d..39a9459ba57 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -50,6 +50,8 @@ rauschig = "1.2.0" mybatis = "3.5.6" h2db = "1.4.200" kyuubi = "1.8.0" +kafka = "3.4.0" +curator = "2.12.0" protobuf-plugin = "0.9.2" spotless-plugin = '6.11.0' @@ -152,6 +154,9 @@ minikdc = { group = "org.apache.hadoop", name = "hadoop-minikdc", version.ref = immutables-value = { module = "org.immutables:value", version.ref = "immutables-value" } commons-cli = { group = "commons-cli", name = "commons-cli", version.ref = "commons-cli" } sun-activation = { group = "com.sun.activation", name = "javax.activation", version.ref = "sun-activation-version" } +kafka-clients = { group = "org.apache.kafka", name = "kafka-clients", version.ref = "kafka" } +kafka = { group = "org.apache.kafka", name = "kafka_2.12", version.ref = "kafka" } +curator-test = { group = "org.apache.curator", name = "curator-test", version.ref = "curator"} selenium = { group = "org.seleniumhq.selenium", name = "selenium-java", version.ref = "selenium" } rauschig = { group = "org.rauschig", name = "jarchivelib", version.ref = "rauschig" }