diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index e8e72400d4823..9e1026a5a171f 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -18,8 +18,8 @@ package kafka.zk
import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
import kafka.server.{ConfigType, KafkaConfig}
-import kafka.test.ClusterInstance
-import kafka.test.annotation.{AutoStart, ClusterConfigProperty, ClusterTest, Type}
+import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance}
+import kafka.test.annotation.{AutoStart, ClusterConfigProperty, ClusterTemplate, ClusterTest, Type}
import kafka.test.junit.ClusterTestExtensions
import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
@@ -55,6 +55,26 @@ import java.util.{Properties, UUID}
import scala.collection.Seq
import scala.jdk.CollectionConverters._
+object ZkMigrationIntegrationTest {
+ def addZkBrokerProps(props: Properties): Unit = {
+ props.setProperty("inter.broker.listener.name", "EXTERNAL")
+ props.setProperty("listeners", "PLAINTEXT://localhost:0,EXTERNAL://localhost:0")
+ props.setProperty("advertised.listeners", "PLAINTEXT://localhost:0,EXTERNAL://localhost:0")
+ props.setProperty("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+ }
+
+ def zkClustersForAllMigrationVersions(clusterGenerator: ClusterGenerator): Unit = {
+ Seq(MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_5_IV2, MetadataVersion.IBP_3_6_IV2).foreach { mv =>
+ val clusterConfig = ClusterConfig.defaultClusterBuilder()
+ .metadataVersion(mv)
+ .brokers(3)
+ .`type`(Type.ZK)
+ .build()
+ addZkBrokerProps(clusterConfig.serverProperties())
+ clusterGenerator.accept(clusterConfig)
+ }
+ }
+}
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@Timeout(300)
@@ -308,12 +328,7 @@ class ZkMigrationIntegrationTest {
}
// SCRAM and Quota are intermixed. Test Quota Only here
- @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
- new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
- new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
- new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
- new ClusterConfigProperty(key = "listener.security.protocol.map", value = "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
- ))
+ @ClusterTemplate("zkClustersForAllMigrationVersions")
def testDualWrite(zkCluster: ClusterInstance): Unit = {
// Create a topic in ZK mode
var admin = zkCluster.createAdminClient()
@@ -334,7 +349,7 @@ class ZkMigrationIntegrationTest {
val clusterId = zkCluster.clusterId()
val kraftCluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
- setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
+ setBootstrapMetadataVersion(zkCluster.config().metadataVersion()).
setClusterId(Uuid.fromString(clusterId)).
setNumBrokerNodes(0).
setNumControllerNodes(1).build())
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 4777614e33671..5b437cec754ac 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -151,7 +151,6 @@
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.kafka.controller.QuorumController.ControllerOperationFlag.DOES_NOT_UPDATE_QUEUE_TIME;
-import static org.apache.kafka.controller.QuorumController.ControllerOperationFlag.COMPLETES_IN_TRANSACTION;
import static org.apache.kafka.controller.QuorumController.ControllerOperationFlag.RUNS_IN_PREMIGRATION;
@@ -684,21 +683,16 @@ enum ControllerOperationFlag {
* Operations without this flag will always return NOT_CONTROLLER when invoked in premigration
* mode.
*
- * In pre-migration mode, we are still waiting to load the metadata from Apache
- * ZooKeeper into the metadata log. Therefore, the metadata log is mostly empty,
- * even though the cluster really does have metadata. Very few operations should
- * use this flag.
- */
- RUNS_IN_PREMIGRATION,
-
- /**
- * This flag signifies that an event will be completed even if it is part of an unfinished transaction.
- * This is needed for metadata transactions so that external callers can add records to a transaction
- * and still use the returned future. One example usage of this flag is the batches of migrations records.
- * The migration driver needs to wait on each submitted batch to avoid overwhelming the controller queue
- * with events, so it needs events to be completed based on the committed (i.e., not stable) offset.
+ * In pre-migration mode, we are still waiting to load the metadata from Apache ZooKeeper into
+ * the metadata log. Therefore, the metadata log is mostly empty, even though the cluster really
+ * does have metadata
+ *
+ * Events using this flag will be completed even if a transaction is ongoing. Pre-migration
+ * events will be completed using the unstable (committed) offset rather than the stable offset.
+ *
+ * In practice, very few operations should use this flag.
*/
- COMPLETES_IN_TRANSACTION
+ RUNS_IN_PREMIGRATION
}
interface ControllerWriteOperation {
@@ -779,7 +773,14 @@ public void run() throws Exception {
// If the operation did not return any records, then it was actually just
// a read after all, and not a read + write. However, this read was done
// from the latest in-memory state, which might contain uncommitted data.
- OptionalLong maybeOffset = deferredEventQueue.highestPendingOffset();
+ // If the operation can complete within a transaction, let it use the
+ // unstable purgatory so that it can complete sooner.
+ OptionalLong maybeOffset;
+ if (featureControl.inPreMigrationMode() && flags.contains(RUNS_IN_PREMIGRATION)) {
+ maybeOffset = deferredUnstableEventQueue.highestPendingOffset();
+ } else {
+ maybeOffset = deferredEventQueue.highestPendingOffset();
+ }
if (!maybeOffset.isPresent()) {
// If the purgatory is empty, there are no pending operations and no
// uncommitted state. We can complete immediately.
@@ -841,7 +842,7 @@ public Long apply(List records) {
// Remember the latest offset and future if it is not already completed
if (!future.isDone()) {
- if (flags.contains(COMPLETES_IN_TRANSACTION)) {
+ if (featureControl.inPreMigrationMode() && flags.contains(RUNS_IN_PREMIGRATION)) {
deferredUnstableEventQueue.add(resultAndOffset.offset(), this);
} else {
deferredEventQueue.add(resultAndOffset.offset(), this);
@@ -961,9 +962,7 @@ CompletableFuture appendWriteEvent(
}
class MigrationRecordConsumer implements ZkRecordConsumer {
- private final EnumSet eventFlags = EnumSet.of(
- RUNS_IN_PREMIGRATION, COMPLETES_IN_TRANSACTION
- );
+ private final EnumSet eventFlags = EnumSet.of(RUNS_IN_PREMIGRATION);
private volatile OffsetAndEpoch highestMigrationRecordOffset;
@@ -1313,7 +1312,7 @@ private void rescheduleMaybeFenceStaleBrokers() {
rescheduleMaybeFenceStaleBrokers();
return result;
},
- EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME, RUNS_IN_PREMIGRATION));
+ EnumSet.of(DOES_NOT_UPDATE_QUEUE_TIME));
}
private void cancelMaybeFenceReplicas() {
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index 09516fd5c11a9..aa60390cc06d4 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -688,7 +688,7 @@ public void run() throws Exception {
transitionTo(MigrationDriverState.SYNC_KRAFT_TO_ZK);
} catch (Throwable t) {
MigrationManifest partialManifest = manifestBuilder.build();
- log.error("Aborting the metadata migration from ZooKeeper to KRaft. {}.", partialManifest);
+ log.error("Aborting the metadata migration from ZooKeeper to KRaft. {}.", partialManifest, t);
zkRecordConsumer.abortMigration(); // This terminates the controller via fatal fault handler
super.handleException(t);
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 1616328fb9caa..3a70312ca1c7a 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -117,6 +117,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1491,6 +1492,7 @@ public void testActivationRecordsPartialTransactionNoSupport() {
setAddingReplicas(Collections.emptyList()).setLeader(1).setLeaderEpoch(0).
setPartitionEpoch(0), (short) 0)
));
+
@Test
public void testFailoverDuringMigrationTransaction() throws Exception {
try (
@@ -1534,4 +1536,62 @@ public void testFailoverDuringMigrationTransaction() throws Exception {
}
}
+
+ @ParameterizedTest
+ @EnumSource(value = MetadataVersion.class, names = {"IBP_3_4_IV0", "IBP_3_5_IV0", "IBP_3_6_IV0", "IBP_3_6_IV1"})
+ public void testBrokerHeartbeatDuringMigration(MetadataVersion metadataVersion) throws Exception {
+ try (
+ LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
+ ) {
+ QuorumControllerTestEnv.Builder controlEnvBuilder = new QuorumControllerTestEnv.Builder(logEnv).
+ setControllerBuilderInitializer(controllerBuilder ->
+ controllerBuilder
+ .setZkMigrationEnabled(true)
+ .setMaxIdleIntervalNs(OptionalLong.of(TimeUnit.MILLISECONDS.toNanos(100)))
+ ).
+ setBootstrapMetadata(BootstrapMetadata.fromVersion(metadataVersion, "test"));
+ QuorumControllerTestEnv controlEnv = controlEnvBuilder.build();
+ QuorumController active = controlEnv.activeController(true);
+
+ // Register a ZK broker
+ BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT,
+ new BrokerRegistrationRequestData().
+ setBrokerId(0).
+ setRack(null).
+ setClusterId(active.clusterId()).
+ setIsMigratingZkBroker(true).
+ setFeatures(brokerFeatures(metadataVersion, metadataVersion)).
+ setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB0")).
+ setListeners(new ListenerCollection(Arrays.asList(new Listener().
+ setName("PLAINTEXT").setHost("localhost").
+ setPort(9092)).iterator()))).get();
+
+ // Start migration
+ ZkRecordConsumer migrationConsumer = active.zkRecordConsumer();
+ migrationConsumer.beginMigration().get(30, TimeUnit.SECONDS);
+
+ // Interleave migration batches with heartbeats. Ensure the heartbeat events use the correct
+ // offset when adding to the purgatory. Otherwise, we get errors like:
+ // There is already a deferred event with offset 292. We should not add one with an offset of 241 which is lower than that.
+ for (int i = 0; i < 100; i++) {
+ Uuid topicId = Uuid.randomUuid();
+ String topicName = "testBrokerHeartbeatDuringMigration" + i;
+ Future> migrationFuture = migrationConsumer.acceptBatch(
+ Arrays.asList(
+ new ApiMessageAndVersion(new TopicRecord().setTopicId(topicId).setName(topicName), (short) 0),
+ new ApiMessageAndVersion(new PartitionRecord().setTopicId(topicId).setPartitionId(0).setIsr(Arrays.asList(0, 1, 2)), (short) 0)));
+ active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
+ setWantFence(false).setBrokerEpoch(reply.epoch()).setBrokerId(0).
+ setCurrentMetadataOffset(100000L + i));
+ migrationFuture.get();
+ }
+
+ // Ensure that we can complete a heartbeat even though we leave migration transaction hanging
+ assertEquals(new BrokerHeartbeatReply(true, false, false, false),
+ active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
+ setWantFence(false).setBrokerEpoch(reply.epoch()).setBrokerId(0).
+ setCurrentMetadataOffset(100100L)).get());
+
+ }
+ }
}