From d3936365bf0fc7df86c2632a576081474d923e08 Mon Sep 17 00:00:00 2001
From: Colin Patrick McCabe <cmccabe@apache.org>
Date: Fri, 13 Sep 2024 09:18:24 -0700
Subject: [PATCH] KAFKA-16468: verify that migrating brokers provide their
 inter.broker.listener (#17159)

When brokers undergoing ZK migration register with the controller, it should verify that they have
provided a way to contact them via their inter.broker.listener. Otherwise the migration will fail
later on with a more confusing error message.

Reviewers: David Arthur <mumrah@gmail.com>
---
 .../scala/kafka/server/ControllerServer.scala |  1 +
 .../BrokerRegistrationRequestTest.scala       |  8 +++++
 .../controller/ClusterControlManager.java     | 25 ++++++++++++++--
 .../kafka/controller/QuorumController.java    | 13 +++++++--
 .../controller/ClusterControlManagerTest.java | 29 +++++++++++++++++++
 5 files changed, 72 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 0e4321cffb38e..0905215f9a26e 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -266,6 +266,7 @@ class ControllerServer(
           setDelegationTokenExpiryTimeMs(config.delegationTokenExpiryTimeMs).
           setDelegationTokenExpiryCheckIntervalMs(config.delegationTokenExpiryCheckIntervalMs).
           setUncleanLeaderElectionCheckIntervalMs(config.uncleanLeaderElectionCheckIntervalMs).
+          setInterBrokerListenerName(config.interBrokerListenerName.value()).
           setEligibleLeaderReplicasEnabled(config.elrEnabled)
       }
       controller = controllerBuilder.build()
diff --git a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
index f60aac80c9034..cd05f19a08e0f 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
@@ -37,6 +37,7 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
 import org.junit.jupiter.api.Timeout
 import org.junit.jupiter.api.extension.ExtendWith
 
+import java.util
 import java.util.Collections
 import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
 
@@ -122,6 +123,13 @@ class BrokerRegistrationRequestTest {
       .setIncarnationId(Uuid.randomUuid())
       .setIsMigratingZkBroker(zkEpoch.isDefined)
       .setFeatures(features)
+      .setListeners(new BrokerRegistrationRequestData.ListenerCollection(util.Arrays.asList(
+        new BrokerRegistrationRequestData.Listener().
+          setName("EXTERNAL").
+          setHost("example.com").
+          setPort(8082).
+          setSecurityProtocol(SecurityProtocol.PLAINTEXT.id))
+            .iterator()))
 
     val resp = sendAndReceive[BrokerRegistrationRequest, BrokerRegistrationResponse](
       channelManager, new BrokerRegistrationRequest.Builder(req), 30000)
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 77ffd1036bede..b0b43f05a4c75 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -93,6 +93,7 @@ static class Builder {
         private FeatureControlManager featureControl = null;
         private boolean zkMigrationEnabled = false;
         private BrokerUncleanShutdownHandler brokerUncleanShutdownHandler = null;
+        private String interBrokerListenerName = "PLAINTEXT";
 
         Builder setLogContext(LogContext logContext) {
             this.logContext = logContext;
@@ -139,6 +140,11 @@ Builder setBrokerUncleanShutdownHandler(BrokerUncleanShutdownHandler brokerUncle
             return this;
         }
 
+        Builder setInterBrokerListenerName(String interBrokerListenerName) {
+            this.interBrokerListenerName = interBrokerListenerName;
+            return this;
+        }
+
         ClusterControlManager build() {
             if (logContext == null) {
                 logContext = new LogContext();
@@ -166,7 +172,8 @@ ClusterControlManager build() {
                 replicaPlacer,
                 featureControl,
                 zkMigrationEnabled,
-                brokerUncleanShutdownHandler
+                brokerUncleanShutdownHandler,
+                interBrokerListenerName
             );
         }
     }
@@ -260,6 +267,11 @@ boolean check() {
 
     private final BrokerUncleanShutdownHandler brokerUncleanShutdownHandler;
 
+    /**
+     * The statically configured inter-broker listener name.
+     */
+    private final String interBrokerListenerName;
+
     /**
      * Maps controller IDs to controller registrations.
      */
@@ -279,7 +291,8 @@ private ClusterControlManager(
         ReplicaPlacer replicaPlacer,
         FeatureControlManager featureControl,
         boolean zkMigrationEnabled,
-        BrokerUncleanShutdownHandler brokerUncleanShutdownHandler
+        BrokerUncleanShutdownHandler brokerUncleanShutdownHandler,
+        String interBrokerListenerName
     ) {
         this.logContext = logContext;
         this.clusterId = clusterId;
@@ -296,6 +309,7 @@ private ClusterControlManager(
         this.controllerRegistrations = new TimelineHashMap<>(snapshotRegistry, 0);
         this.directoryToBroker = new TimelineHashMap<>(snapshotRegistry, 0);
         this.brokerUncleanShutdownHandler = brokerUncleanShutdownHandler;
+        this.interBrokerListenerName = interBrokerListenerName;
     }
 
     ReplicaPlacer replicaPlacer() {
@@ -377,6 +391,13 @@ public ControllerResult<BrokerRegistrationReply> registerBroker(
                 "brokers until the metadata migration is complete.");
         }
 
+        if (request.isMigratingZkBroker()) {
+            if (request.listeners().find(interBrokerListenerName) == null) {
+                throw new InvalidRegistrationException("Broker does not have the current inter.broker.listener " +
+                        interBrokerListenerName);
+            }
+        }
+
         if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) {
             if (request.logDirs().isEmpty()) {
                 throw new InvalidRegistrationException("No directories specified in request");
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 64723b86c0da6..74451716277c3 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -225,6 +225,7 @@ public static class Builder {
         private long delegationTokenExpiryTimeMs;
         private long delegationTokenExpiryCheckIntervalMs;
         private long uncleanLeaderElectionCheckIntervalMs = TimeUnit.MINUTES.toMillis(5);
+        private String interBrokerListenerName = "PLAINTEXT";
 
         public Builder(int nodeId, String clusterId) {
             this.nodeId = nodeId;
@@ -380,6 +381,11 @@ public Builder setUncleanLeaderElectionCheckIntervalMs(long uncleanLeaderElectio
             return this;
         }
 
+        public Builder setInterBrokerListenerName(String interBrokerListenerName) {
+            this.interBrokerListenerName = interBrokerListenerName;
+            return this;
+        }
+
         public QuorumController build() throws Exception {
             if (raftClient == null) {
                 throw new IllegalStateException("You must set a raft client.");
@@ -437,7 +443,8 @@ public QuorumController build() throws Exception {
                     delegationTokenExpiryTimeMs,
                     delegationTokenExpiryCheckIntervalMs,
                     eligibleLeaderReplicasEnabled,
-                    uncleanLeaderElectionCheckIntervalMs
+                    uncleanLeaderElectionCheckIntervalMs,
+                    interBrokerListenerName
                 );
             } catch (Exception e) {
                 Utils.closeQuietly(queue, "event queue");
@@ -1868,7 +1875,8 @@ private QuorumController(
         long delegationTokenExpiryTimeMs,
         long delegationTokenExpiryCheckIntervalMs,
         boolean eligibleLeaderReplicasEnabled,
-        long uncleanLeaderElectionCheckIntervalMs
+        long uncleanLeaderElectionCheckIntervalMs,
+        String interBrokerListenerName
     ) {
         this.nonFatalFaultHandler = nonFatalFaultHandler;
         this.fatalFaultHandler = fatalFaultHandler;
@@ -1925,6 +1933,7 @@ private QuorumController(
             setFeatureControlManager(featureControl).
             setZkMigrationEnabled(zkMigrationEnabled).
             setBrokerUncleanShutdownHandler(this::handleUncleanBrokerShutdown).
+            setInterBrokerListenerName(interBrokerListenerName).
             build();
         this.producerIdControlManager = new ProducerIdControlManager.Builder().
             setLogContext(logContext).
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 0646d4aaa5647..786891e695bcd 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -852,4 +852,33 @@ public void testReRegistrationAndBrokerEpoch(boolean newIncarnationId) {
                     clusterControl.brokerRegistrations().get(1).epoch());
         }
     }
+
+    @Test
+    public void testRegistrationWithIncorrectInterBrokerListenerName() {
+        ClusterControlManager clusterControl = new ClusterControlManager.Builder().
+                setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
+                setFeatureControlManager(new FeatureControlManager.Builder().build()).
+                setBrokerUncleanShutdownHandler((brokerId, records) -> { }).
+                setInterBrokerListenerName("INTERNAL").
+                setZkMigrationEnabled(true).
+                build();
+        clusterControl.activate();
+        assertEquals("Broker does not have the current inter.broker.listener INTERNAL",
+            assertThrows(InvalidRegistrationException.class,
+                () -> clusterControl.registerBroker(
+                    new BrokerRegistrationRequestData().
+                        setBrokerId(1).
+                        setClusterId(clusterControl.clusterId()).
+                        setIncarnationId(Uuid.fromString("07OOcU7MQFeSmGAFPP2Zww")).
+                        setLogDirs(Arrays.asList(Uuid.fromString("Vv1gzkM2QpuE-PPrIc6XEw"))).
+                        setIsMigratingZkBroker(true).
+                        setListeners(new BrokerRegistrationRequestData.ListenerCollection(Collections.singleton(
+                            new BrokerRegistrationRequestData.Listener().
+                                setName("PLAINTEXT").
+                                setHost("example.com").
+                                setPort(9092).
+                                setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)).iterator())),
+                        111,
+                    new FinalizedControllerFeatures(Collections.emptyMap(), 100L))).getMessage());
+    }
 }