diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index d7599e569ab25..45d6ab7908db5 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -1604,11 +1604,16 @@ object LogManager { newTopicsImage: TopicsImage, log: UnifiedLog ): Boolean = { - val topicId = log.topicId.getOrElse { - throw new RuntimeException(s"The log dir $log does not have a topic ID, " + - "which is not allowed when running in KRaft mode.") + if (log.topicId.isEmpty) { + // Missing topic ID could result from storage failure or unclean shutdown after topic creation but before flushing + // data to the `partition.metadata` file. And before appending data to the log, the `partition.metadata` is always + // flushed to disk. So if the topic ID is missing, it mostly means no data was appended, and we can treat this as + // a stray log. + info(s"The topicId does not exist in $log, treat it as a stray log") + return true } + val topicId = log.topicId.get val partitionId = log.topicPartition.partition() Option(newTopicsImage.getPartition(topicId, partitionId)) match { case Some(partition) => diff --git a/core/src/test/java/kafka/server/LogManagerIntegrationTest.java b/core/src/test/java/kafka/server/LogManagerIntegrationTest.java new file mode 100644 index 0000000000000..709454beccd6b --- /dev/null +++ b/core/src/test/java/kafka/server/LogManagerIntegrationTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import kafka.test.junit.RaftClusterInvocationContext; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.storage.internals.checkpoint.PartitionMetadataFile; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ExecutionException; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ExtendWith(value = ClusterTestExtensions.class) +@Tag("integration") +public class LogManagerIntegrationTest { + private final ClusterInstance cluster; + + public LogManagerIntegrationTest(ClusterInstance cluster) { + this.cluster = cluster; + } + + @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 3) + public void testRestartBrokerNoErrorIfMissingPartitionMetadata() throws IOException, ExecutionException, InterruptedException { + RaftClusterInvocationContext.RaftClusterInstance raftInstance = + (RaftClusterInvocationContext.RaftClusterInstance) cluster; + + try (Admin admin = cluster.createAdminClient()) { + admin.createTopics(Collections.singletonList(new NewTopic("foo", 1, (short) 3))).all().get(); + } + cluster.waitForTopic("foo", 1); + + Optional partitionMetadataFile = Optional.ofNullable( + raftInstance.getUnderlying().brokers().get(0).logManager() + .getLog(new TopicPartition("foo", 0), false).get() + .partitionMetadataFile().getOrElse(null)); + assertTrue(partitionMetadataFile.isPresent()); + + raftInstance.getUnderlying().brokers().get(0).shutdown(); + try (Admin admin = cluster.createAdminClient()) { + TestUtils.waitForCondition(() -> { + List partitionInfos = admin.describeTopics(Collections.singletonList("foo")) + .topicNameValues().get("foo").get().partitions(); + return partitionInfos.get(0).isr().size() == 2; + }, "isr size is not shrink to 2"); + } + + // delete partition.metadata file here to simulate the scenario that partition.metadata not flush to disk yet + partitionMetadataFile.get().delete(); + assertFalse(partitionMetadataFile.get().exists()); + raftInstance.getUnderlying().brokers().get(0).startup(); + // make sure there is no error during load logs + assertDoesNotThrow(() -> raftInstance.getUnderlying().fatalFaultHandler().maybeRethrowFirstException()); + try (Admin admin = cluster.createAdminClient()) { + TestUtils.waitForCondition(() -> { + List partitionInfos = admin.describeTopics(Collections.singletonList("foo")) + .topicNameValues().get("foo").get().partitions(); + return partitionInfos.get(0).isr().size() == 3; + }, "isr size is not expand to 3"); + } + + // make sure topic still work fine + Map producerConfigs = new HashMap<>(); + producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + try (Producer producer = new KafkaProducer<>(producerConfigs)) { + producer.send(new ProducerRecord<>("foo", 0, null, "bar")).get(); + producer.flush(); + } + + Map consumerConfigs = new HashMap<>(); + consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); + consumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + + try (Consumer consumer = new KafkaConsumer<>(consumerConfigs)) { + consumer.assign(Collections.singletonList(new TopicPartition("foo", 0))); + consumer.seekToBeginning(Collections.singletonList(new TopicPartition("foo", 0))); + List values = new ArrayList<>(); + ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); + for (ConsumerRecord record : records) { + values.add(record.value()); + } + assertEquals(1, values.size()); + assertEquals("bar", values.get(0)); + } + } +} diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 13ce4d28e9d3d..6e92094007152 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -1286,6 +1286,13 @@ class LogManagerTest { onDisk.foreach(log => assertEquals(expectedStrays.contains(log.topicPartition), LogManager.isStrayKraftReplica(0, image, log))) } + @Test + def testIsStrayKraftMissingTopicId(): Unit = { + val log = Mockito.mock(classOf[UnifiedLog]) + Mockito.when(log.topicId).thenReturn(Option.empty) + assertTrue(LogManager.isStrayKraftReplica(0, topicsImage(Seq()), log)) + } + @Test def testFindStrayReplicasInEmptyLAIR(): Unit = { val onDisk = Seq(foo0, foo1, bar0, bar1, baz0, baz1, baz2, quux0)