diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java index 364090df59..439b373370 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/AdminManager.java @@ -18,6 +18,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicException; +import io.streamnative.pulsar.handlers.kop.storage.PartitionLog; import io.streamnative.pulsar.handlers.kop.utils.KopTopic; import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation; import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory; @@ -136,7 +137,7 @@ public CompletableFuture> createTopicsAsync( return; } admin.topics().createPartitionedTopicAsync(kopTopic.getFullName(), numPartitions, - Map.of("kafkaTopicUUID", UUID.randomUUID().toString())) + Map.of(PartitionLog.KAFKA_TOPIC_UUID_PROPERTY_NAME, UUID.randomUUID().toString())) .whenComplete((ignored, e) -> { if (e == null) { if (log.isDebugEnabled()) { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 6f33a9d80e..233b7892c2 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -64,6 +64,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; @@ -539,7 +540,9 @@ private CompletableFuture getTopicMetadataAsync(String topic, return; } if (allowed) { - admin.topics().createPartitionedTopicAsync(topic, defaultNumPartitions) + Map properties = + Map.of(PartitionLog.KAFKA_TOPIC_UUID_PROPERTY_NAME, UUID.randomUUID().toString()); + admin.topics().createPartitionedTopicAsync(topic, defaultNumPartitions, properties) .whenComplete((__, createException) -> { if (createException == null) { future.complete(TopicAndMetadata.success(topic, defaultNumPartitions)); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java index 06eb600df0..f3b5ffd33c 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java @@ -111,6 +111,7 @@ class AnalyzeResult { @Slf4j public class PartitionLog { + public static final String KAFKA_TOPIC_UUID_PROPERTY_NAME = "kafkaTopicUUID"; private static final String PID_PREFIX = "KOP-PID-PREFIX"; private static final KopLogValidator.CompressionCodec DEFAULT_COMPRESSION = @@ -212,7 +213,7 @@ private CompletableFuture loadTopicProperties() { this.topicProperties = properties; log.info("Topic properties for {} are {}", fullPartitionName, properties); this.entryFormatter = buildEntryFormatter(topicProperties); - this.kafkaTopicUUID = properties.get("kafkaTopicUUID"); + this.kafkaTopicUUID = properties.get(KAFKA_TOPIC_UUID_PROPERTY_NAME); this.producerStateManager = new ProducerStateManager( fullPartitionName, diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java index 847cdb7283..de9d720a95 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -38,6 +39,7 @@ import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager; import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata; +import io.streamnative.pulsar.handlers.kop.storage.PartitionLog; import io.streamnative.pulsar.handlers.kop.utils.KafkaResponseUtils; import io.streamnative.pulsar.handlers.kop.utils.TopicNameUtils; import java.net.InetSocketAddress; @@ -1024,6 +1026,9 @@ public void testBrokerHandleTopicMetadataRequestAllowAutoTopicCreation(boolean b expectedError = null; assertEquals(1, metadataResponse.topicMetadata().size()); assertEquals(topicName, metadataResponse.topicMetadata().iterator().next().topic()); + Map properties = admin.topics().getProperties(topicName); + assertFalse(properties.isEmpty()); + assertTrue(properties.containsKey(PartitionLog.KAFKA_TOPIC_UUID_PROPERTY_NAME)); } else { // topic does not exist and it is not created expectedError = Errors.UNKNOWN_TOPIC_OR_PARTITION;