From e64fdb22994a3539a63173571330aa3b2b517562 Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Sat, 1 Jun 2024 07:13:24 +0000 Subject: [PATCH 1/3] KAFKA-16814 KRaft broker cannot startup when `partition.metadata` is missing --- .../src/main/scala/kafka/log/LogManager.scala | 11 +- .../server/LogManagerIntegrationTest.java | 107 ++++++++++++++++++ .../scala/unit/kafka/log/LogManagerTest.scala | 7 ++ 3 files changed, 122 insertions(+), 3 deletions(-) create mode 100644 core/src/test/java/kafka/server/LogManagerIntegrationTest.java diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 3bc6533117cba..5d4320874d95b 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -1604,11 +1604,16 @@ object LogManager { newTopicsImage: TopicsImage, log: UnifiedLog ): Boolean = { - val topicId = log.topicId.getOrElse { - throw new RuntimeException(s"The log dir $log does not have a topic ID, " + - "which is not allowed when running in KRaft mode.") + if (log.topicId.isEmpty) { + // Missing topic ID could result from storage failure or unclean shutdown after topic creation but before flushing + // data to the `partition.metadata` file. And before appending data to the log, the `partition.metadata` is always + // flushed to disk. So if the topic ID is missing, it mostly means no data was appended, and we can treat this as + // a stray log directory. + info(s"The topicId does not exist in $log, treat it as stray log dir") + return true } + val topicId = log.topicId.get val partitionId = log.topicPartition.partition() Option(newTopicsImage.getPartition(topicId, partitionId)) match { case Some(partition) => diff --git a/core/src/test/java/kafka/server/LogManagerIntegrationTest.java b/core/src/test/java/kafka/server/LogManagerIntegrationTest.java new file mode 100644 index 0000000000000..8fe584a0c981c --- /dev/null +++ b/core/src/test/java/kafka/server/LogManagerIntegrationTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import kafka.test.junit.RaftClusterInvocationContext; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutionException; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@ExtendWith(value = ClusterTestExtensions.class) +@Tag("integration") +public class LogManagerIntegrationTest { + private final ClusterInstance cluster; + + public LogManagerIntegrationTest(ClusterInstance cluster) { + this.cluster = cluster; + } + + @ClusterTest(types = {Type.KRAFT}) + public void testRestartBrokerNoErrorIfMissingPartitionMetadata() throws IOException, ExecutionException, InterruptedException { + RaftClusterInvocationContext.RaftClusterInstance raftInstance = (RaftClusterInvocationContext.RaftClusterInstance) cluster; + + try (Admin admin = cluster.createAdminClient()) { + admin.createTopics(Collections.singletonList(new NewTopic("foo", 1, (short) 1))); + } + + raftInstance.getUnderlying().brokers().get(0).shutdown(); + // delete partition.metadata file here to simulate the scenario that partition.metadata not flush to disk yet + raftInstance.getUnderlying().brokers().get(0) + .logManager().getLog(new TopicPartition("foo", 0), false).get().partitionMetadataFile().get().delete(); + raftInstance.getUnderlying().brokers().get(0).startup(); + assertDoesNotThrow(() -> raftInstance.getUnderlying().fatalFaultHandler().maybeRethrowFirstException()); + + // make sure topic still work fine + Map producerConfigs = new HashMap<>(); + producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + try (Producer producer = new KafkaProducer<>(producerConfigs)) { + producer.send(new ProducerRecord<>("foo", 0, null, "bar")).get(); + producer.flush(); + } + + Map consumerConfigs = new HashMap<>(); + consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); + consumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + + try (Consumer consumer = new KafkaConsumer<>(consumerConfigs)) { + consumer.assign(Collections.singletonList(new TopicPartition("foo", 0))); + consumer.seekToBeginning(Collections.singletonList(new TopicPartition("foo", 0))); + List values = new ArrayList<>(); + ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); + for (ConsumerRecord record : records) { + values.add(record.value()); + } + assertEquals(1, values.size()); + assertEquals("bar", values.get(0)); + } + } +} diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 13ce4d28e9d3d..6e92094007152 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -1286,6 +1286,13 @@ class LogManagerTest { onDisk.foreach(log => assertEquals(expectedStrays.contains(log.topicPartition), LogManager.isStrayKraftReplica(0, image, log))) } + @Test + def testIsStrayKraftMissingTopicId(): Unit = { + val log = Mockito.mock(classOf[UnifiedLog]) + Mockito.when(log.topicId).thenReturn(Option.empty) + assertTrue(LogManager.isStrayKraftReplica(0, topicsImage(Seq()), log)) + } + @Test def testFindStrayReplicasInEmptyLAIR(): Unit = { val onDisk = Seq(foo0, foo1, bar0, bar1, baz0, baz1, baz2, quux0) From fbaa648abc1336aa62e059f0ba17432f5958334c Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Mon, 3 Jun 2024 14:31:32 +0000 Subject: [PATCH 2/3] Address comments --- .../server/LogManagerIntegrationTest.java | 38 ++++++++++++++++--- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/core/src/test/java/kafka/server/LogManagerIntegrationTest.java b/core/src/test/java/kafka/server/LogManagerIntegrationTest.java index 8fe584a0c981c..52af0e3fa5201 100644 --- a/core/src/test/java/kafka/server/LogManagerIntegrationTest.java +++ b/core/src/test/java/kafka/server/LogManagerIntegrationTest.java @@ -33,8 +33,11 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.storage.internals.checkpoint.PartitionMetadataFile; +import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.extension.ExtendWith; @@ -45,11 +48,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutionException; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(value = ClusterTestExtensions.class) @Tag("integration") @@ -60,20 +65,43 @@ public LogManagerIntegrationTest(ClusterInstance cluster) { this.cluster = cluster; } - @ClusterTest(types = {Type.KRAFT}) + @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 3) public void testRestartBrokerNoErrorIfMissingPartitionMetadata() throws IOException, ExecutionException, InterruptedException { - RaftClusterInvocationContext.RaftClusterInstance raftInstance = (RaftClusterInvocationContext.RaftClusterInstance) cluster; + RaftClusterInvocationContext.RaftClusterInstance raftInstance = + (RaftClusterInvocationContext.RaftClusterInstance) cluster; try (Admin admin = cluster.createAdminClient()) { - admin.createTopics(Collections.singletonList(new NewTopic("foo", 1, (short) 1))); + admin.createTopics(Collections.singletonList(new NewTopic("foo", 1, (short) 3))).all().get(); } + cluster.waitForTopic("foo", 1); + + Optional partitionMetadataFile = Optional.ofNullable( + raftInstance.getUnderlying().brokers().get(0).logManager() + .getLog(new TopicPartition("foo", 0), false).get() + .partitionMetadataFile().getOrElse(null)); + assertTrue(partitionMetadataFile.isPresent()); raftInstance.getUnderlying().brokers().get(0).shutdown(); + try (Admin admin = cluster.createAdminClient()) { + TestUtils.waitForCondition(() -> { + List partitionInfos = admin.describeTopics(Collections.singletonList("foo")) + .topicNameValues().get("foo").get().partitions(); + return partitionInfos.get(0).isr().size() == 2; + }, "isr size is not shrink to 2"); + } + // delete partition.metadata file here to simulate the scenario that partition.metadata not flush to disk yet - raftInstance.getUnderlying().brokers().get(0) - .logManager().getLog(new TopicPartition("foo", 0), false).get().partitionMetadataFile().get().delete(); + partitionMetadataFile.get().delete(); raftInstance.getUnderlying().brokers().get(0).startup(); + // make sure there is no error during load logs assertDoesNotThrow(() -> raftInstance.getUnderlying().fatalFaultHandler().maybeRethrowFirstException()); + try (Admin admin = cluster.createAdminClient()) { + TestUtils.waitForCondition(() -> { + List partitionInfos = admin.describeTopics(Collections.singletonList("foo")) + .topicNameValues().get("foo").get().partitions(); + return partitionInfos.get(0).isr().size() == 3; + }, "isr size is not expand to 3"); + } // make sure topic still work fine Map producerConfigs = new HashMap<>(); From 220325cbf431c7fe93aa3773629240da917396a6 Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Tue, 4 Jun 2024 13:50:21 +0000 Subject: [PATCH 3/3] Address comments --- core/src/main/scala/kafka/log/LogManager.scala | 4 ++-- .../src/test/java/kafka/server/LogManagerIntegrationTest.java | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 7990596c677f5..45d6ab7908db5 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -1608,8 +1608,8 @@ object LogManager { // Missing topic ID could result from storage failure or unclean shutdown after topic creation but before flushing // data to the `partition.metadata` file. And before appending data to the log, the `partition.metadata` is always // flushed to disk. So if the topic ID is missing, it mostly means no data was appended, and we can treat this as - // a stray log directory. - info(s"The topicId does not exist in $log, treat it as stray log dir") + // a stray log. + info(s"The topicId does not exist in $log, treat it as a stray log") return true } diff --git a/core/src/test/java/kafka/server/LogManagerIntegrationTest.java b/core/src/test/java/kafka/server/LogManagerIntegrationTest.java index 52af0e3fa5201..709454beccd6b 100644 --- a/core/src/test/java/kafka/server/LogManagerIntegrationTest.java +++ b/core/src/test/java/kafka/server/LogManagerIntegrationTest.java @@ -54,6 +54,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(value = ClusterTestExtensions.class) @@ -92,6 +93,7 @@ public void testRestartBrokerNoErrorIfMissingPartitionMetadata() throws IOExcept // delete partition.metadata file here to simulate the scenario that partition.metadata not flush to disk yet partitionMetadataFile.get().delete(); + assertFalse(partitionMetadataFile.get().exists()); raftInstance.getUnderlying().brokers().get(0).startup(); // make sure there is no error during load logs assertDoesNotThrow(() -> raftInstance.getUnderlying().fatalFaultHandler().maybeRethrowFirstException());