Skip to content

Commit

Permalink
KAFKA-16814 KRaft broker cannot startup when partition.metadata is …
Browse files Browse the repository at this point in the history
…missing
  • Loading branch information
brandboat committed Jun 1, 2024
1 parent fb566e4 commit d7ddbf0
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 3 deletions.
11 changes: 8 additions & 3 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1604,11 +1604,16 @@ object LogManager {
newTopicsImage: TopicsImage,
log: UnifiedLog
): Boolean = {
val topicId = log.topicId.getOrElse {
throw new RuntimeException(s"The log dir $log does not have a topic ID, " +
"which is not allowed when running in KRaft mode.")
if (log.topicId.isEmpty) {
// Missing topic ID could result from storage failure or unclean shutdown after topic creation but before flushing
// data to the `partition.metadata` file. And before appending data to the log, the `partition.metadata` is always
// flushed to disk. So if the topic ID is missing, it mostly means no data was appended, and we can treat this as
// a stray log directory.
info(s"The topicId does not exist in $log, treat it as stray log dir")
return true
}

val topicId = log.topicId.get
val partitionId = log.topicPartition.partition()
Option(newTopicsImage.getPartition(topicId, partitionId)) match {
case Some(partition) =>
Expand Down
107 changes: 107 additions & 0 deletions core/src/test/java/kafka/server/LogManagerIntegrationTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server;

import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import kafka.test.junit.RaftClusterInvocationContext;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;

@ExtendWith(value = ClusterTestExtensions.class)
@Tag("integration")
public class LogManagerIntegrationTest {
private final ClusterInstance cluster;

public LogManagerIntegrationTest(ClusterInstance cluster) {
this.cluster = cluster;
}

@ClusterTest(types = {Type.KRAFT})
public void testRestartBrokerNoErrorIfMissingPartitionMetadata() throws IOException, ExecutionException, InterruptedException {
RaftClusterInvocationContext.RaftClusterInstance raftInstance = (RaftClusterInvocationContext.RaftClusterInstance) cluster;

try (Admin admin = cluster.createAdminClient()) {
admin.createTopics(Collections.singletonList(new NewTopic("foo", 1, (short) 1)));
}

raftInstance.getUnderlying().brokers().get(0).shutdown();
// delete partition.metadata file here to simulate the scenario that partition.metadata not flush to disk yet
raftInstance.getUnderlying().brokers().get(0)
.logManager().getLog(new TopicPartition("foo", 0), false).get().partitionMetadataFile().get().delete();
raftInstance.getUnderlying().brokers().get(0).startup();
assertDoesNotThrow(() -> raftInstance.getUnderlying().fatalFaultHandler().maybeRethrowFirstException());

// make sure topic still work fine
Map<String, Object> producerConfigs = new HashMap<>();
producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
producerConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

try (Producer<String, String> producer = new KafkaProducer<>(producerConfigs)) {
producer.send(new ProducerRecord<>("foo", 0, null, "bar")).get();
producer.flush();
}

Map<String, Object> consumerConfigs = new HashMap<>();
consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
consumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

try (Consumer<String, String> consumer = new KafkaConsumer<>(consumerConfigs)) {
consumer.assign(Collections.singletonList(new TopicPartition("foo", 0)));
consumer.seekToBeginning(Collections.singletonList(new TopicPartition("foo", 0)));
List<String> values = new ArrayList<>();
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMinutes(1));
for (ConsumerRecord<String, String> record : records) {
values.add(record.value());
}
assertEquals(1, values.size());
assertEquals("bar", values.get(0));
}
}
}
7 changes: 7 additions & 0 deletions core/src/test/scala/unit/kafka/log/LogManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1286,6 +1286,13 @@ class LogManagerTest {
onDisk.foreach(log => assertEquals(expectedStrays.contains(log.topicPartition), LogManager.isStrayKraftReplica(0, image, log)))
}

@Test
def testIsStrayKraftMissingTopicId(): Unit = {
val log = Mockito.mock(classOf[UnifiedLog])
Mockito.when(log.topicId).thenReturn(Option.empty)
assertTrue(LogManager.isStrayKraftReplica(0, topicsImage(Seq()), log))
}

@Test
def testFindStrayReplicasInEmptyLAIR(): Unit = {
val onDisk = Seq(foo0, foo1, bar0, bar1, baz0, baz1, baz2, quux0)
Expand Down

0 comments on commit d7ddbf0

Please sign in to comment.