diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index af2df8b65221..119b422c4c58 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -381,6 +381,10 @@ private void updateIsrMembers(Uuid topicId, int partitionId, int[] prevIsr, int[ // Check the topic names. validateNewTopicNames(topicErrors, request.topics()); + // Identify topics that already exist and mark them with the appropriate error + request.topics().stream().filter(creatableTopic -> topicsByName.containsKey(creatableTopic.name())) + .forEach(t -> topicErrors.put(t.name(), new ApiError(Errors.TOPIC_ALREADY_EXISTS))); + // Verify that the configurations for the new topics are OK, and figure out what // ConfigRecords should be created. Map>> configChanges = diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 5411a3f25ae3..f2b8613b75c5 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -17,7 +17,6 @@ package org.apache.kafka.controller; -import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.CreateTopicsRequestData; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; @@ -111,6 +110,13 @@ public void testCreateTopics() throws Exception { new int[] {1, 2, 0}, null, null, 1, 0), replicationControl.getPartition( ((TopicRecord) result2.records().get(0).message()).topicId(), 0)); + ControllerResult result3 = + replicationControl.createTopics(request); + CreateTopicsResponseData expectedResponse3 = new CreateTopicsResponseData(); + expectedResponse3.topics().add(new CreatableTopicResult().setName("foo"). + setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()). + setErrorMessage(Errors.TOPIC_ALREADY_EXISTS.exception().getMessage())); + assertEquals(expectedResponse3, result3.response()); } @Test diff --git a/tests/kafkatest/tests/streams/streams_smoke_test.py b/tests/kafkatest/tests/streams/streams_smoke_test.py index 1a4f296eb2a7..b1f908ddcf3b 100644 --- a/tests/kafkatest/tests/streams/streams_smoke_test.py +++ b/tests/kafkatest/tests/streams/streams_smoke_test.py @@ -16,6 +16,7 @@ from ducktape.mark import matrix from ducktape.mark.resource import cluster +from kafkatest.services.kafka import quorum from kafkatest.tests.kafka_test import KafkaTest from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService @@ -46,8 +47,8 @@ def __init__(self, test_context): self.driver = StreamsSmokeTestDriverService(test_context, self.kafka) @cluster(num_nodes=8) - @matrix(processing_guarantee=['at_least_once', 'exactly_once', 'exactly_once_beta'], crash=[True, False]) - def test_streams(self, processing_guarantee, crash): + @matrix(processing_guarantee=['at_least_once', 'exactly_once', 'exactly_once_beta'], crash=[True, False], metadata_quorum=quorum.all_non_upgrade) + def test_streams(self, processing_guarantee, crash, metadata_quorum=quorum.zk): processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee) processor2 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee) processor3 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee)