Skip to content

Commit

Permalink
KAFKA-16468: verify that migrating brokers provide their inter.broker…
Browse files Browse the repository at this point in the history
….listener (#17159)

When brokers undergoing ZK migration register with the controller, it should verify that they have
provided a way to contact them via their inter.broker.listener. Otherwise the migration will fail
later on with a more confusing error message.

Reviewers: David Arthur <[email protected]>
  • Loading branch information
cmccabe authored Sep 13, 2024
1 parent 420f69a commit d393636
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 4 deletions.
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ class ControllerServer(
setDelegationTokenExpiryTimeMs(config.delegationTokenExpiryTimeMs).
setDelegationTokenExpiryCheckIntervalMs(config.delegationTokenExpiryCheckIntervalMs).
setUncleanLeaderElectionCheckIntervalMs(config.uncleanLeaderElectionCheckIntervalMs).
setInterBrokerListenerName(config.interBrokerListenerName.value()).
setEligibleLeaderReplicasEnabled(config.elrEnabled)
}
controller = controllerBuilder.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.extension.ExtendWith

import java.util
import java.util.Collections
import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}

Expand Down Expand Up @@ -122,6 +123,13 @@ class BrokerRegistrationRequestTest {
.setIncarnationId(Uuid.randomUuid())
.setIsMigratingZkBroker(zkEpoch.isDefined)
.setFeatures(features)
.setListeners(new BrokerRegistrationRequestData.ListenerCollection(util.Arrays.asList(
new BrokerRegistrationRequestData.Listener().
setName("EXTERNAL").
setHost("example.com").
setPort(8082).
setSecurityProtocol(SecurityProtocol.PLAINTEXT.id))
.iterator()))

val resp = sendAndReceive[BrokerRegistrationRequest, BrokerRegistrationResponse](
channelManager, new BrokerRegistrationRequest.Builder(req), 30000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ static class Builder {
private FeatureControlManager featureControl = null;
private boolean zkMigrationEnabled = false;
private BrokerUncleanShutdownHandler brokerUncleanShutdownHandler = null;
private String interBrokerListenerName = "PLAINTEXT";

Builder setLogContext(LogContext logContext) {
this.logContext = logContext;
Expand Down Expand Up @@ -139,6 +140,11 @@ Builder setBrokerUncleanShutdownHandler(BrokerUncleanShutdownHandler brokerUncle
return this;
}

Builder setInterBrokerListenerName(String interBrokerListenerName) {
this.interBrokerListenerName = interBrokerListenerName;
return this;
}

ClusterControlManager build() {
if (logContext == null) {
logContext = new LogContext();
Expand Down Expand Up @@ -166,7 +172,8 @@ ClusterControlManager build() {
replicaPlacer,
featureControl,
zkMigrationEnabled,
brokerUncleanShutdownHandler
brokerUncleanShutdownHandler,
interBrokerListenerName
);
}
}
Expand Down Expand Up @@ -260,6 +267,11 @@ boolean check() {

private final BrokerUncleanShutdownHandler brokerUncleanShutdownHandler;

/**
* The statically configured inter-broker listener name.
*/
private final String interBrokerListenerName;

/**
* Maps controller IDs to controller registrations.
*/
Expand All @@ -279,7 +291,8 @@ private ClusterControlManager(
ReplicaPlacer replicaPlacer,
FeatureControlManager featureControl,
boolean zkMigrationEnabled,
BrokerUncleanShutdownHandler brokerUncleanShutdownHandler
BrokerUncleanShutdownHandler brokerUncleanShutdownHandler,
String interBrokerListenerName
) {
this.logContext = logContext;
this.clusterId = clusterId;
Expand All @@ -296,6 +309,7 @@ private ClusterControlManager(
this.controllerRegistrations = new TimelineHashMap<>(snapshotRegistry, 0);
this.directoryToBroker = new TimelineHashMap<>(snapshotRegistry, 0);
this.brokerUncleanShutdownHandler = brokerUncleanShutdownHandler;
this.interBrokerListenerName = interBrokerListenerName;
}

ReplicaPlacer replicaPlacer() {
Expand Down Expand Up @@ -377,6 +391,13 @@ public ControllerResult<BrokerRegistrationReply> registerBroker(
"brokers until the metadata migration is complete.");
}

if (request.isMigratingZkBroker()) {
if (request.listeners().find(interBrokerListenerName) == null) {
throw new InvalidRegistrationException("Broker does not have the current inter.broker.listener " +
interBrokerListenerName);
}
}

if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) {
if (request.logDirs().isEmpty()) {
throw new InvalidRegistrationException("No directories specified in request");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ public static class Builder {
private long delegationTokenExpiryTimeMs;
private long delegationTokenExpiryCheckIntervalMs;
private long uncleanLeaderElectionCheckIntervalMs = TimeUnit.MINUTES.toMillis(5);
private String interBrokerListenerName = "PLAINTEXT";

public Builder(int nodeId, String clusterId) {
this.nodeId = nodeId;
Expand Down Expand Up @@ -380,6 +381,11 @@ public Builder setUncleanLeaderElectionCheckIntervalMs(long uncleanLeaderElectio
return this;
}

public Builder setInterBrokerListenerName(String interBrokerListenerName) {
this.interBrokerListenerName = interBrokerListenerName;
return this;
}

public QuorumController build() throws Exception {
if (raftClient == null) {
throw new IllegalStateException("You must set a raft client.");
Expand Down Expand Up @@ -437,7 +443,8 @@ public QuorumController build() throws Exception {
delegationTokenExpiryTimeMs,
delegationTokenExpiryCheckIntervalMs,
eligibleLeaderReplicasEnabled,
uncleanLeaderElectionCheckIntervalMs
uncleanLeaderElectionCheckIntervalMs,
interBrokerListenerName
);
} catch (Exception e) {
Utils.closeQuietly(queue, "event queue");
Expand Down Expand Up @@ -1868,7 +1875,8 @@ private QuorumController(
long delegationTokenExpiryTimeMs,
long delegationTokenExpiryCheckIntervalMs,
boolean eligibleLeaderReplicasEnabled,
long uncleanLeaderElectionCheckIntervalMs
long uncleanLeaderElectionCheckIntervalMs,
String interBrokerListenerName
) {
this.nonFatalFaultHandler = nonFatalFaultHandler;
this.fatalFaultHandler = fatalFaultHandler;
Expand Down Expand Up @@ -1925,6 +1933,7 @@ private QuorumController(
setFeatureControlManager(featureControl).
setZkMigrationEnabled(zkMigrationEnabled).
setBrokerUncleanShutdownHandler(this::handleUncleanBrokerShutdown).
setInterBrokerListenerName(interBrokerListenerName).
build();
this.producerIdControlManager = new ProducerIdControlManager.Builder().
setLogContext(logContext).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -852,4 +852,33 @@ public void testReRegistrationAndBrokerEpoch(boolean newIncarnationId) {
clusterControl.brokerRegistrations().get(1).epoch());
}
}

@Test
public void testRegistrationWithIncorrectInterBrokerListenerName() {
ClusterControlManager clusterControl = new ClusterControlManager.Builder().
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
setFeatureControlManager(new FeatureControlManager.Builder().build()).
setBrokerUncleanShutdownHandler((brokerId, records) -> { }).
setInterBrokerListenerName("INTERNAL").
setZkMigrationEnabled(true).
build();
clusterControl.activate();
assertEquals("Broker does not have the current inter.broker.listener INTERNAL",
assertThrows(InvalidRegistrationException.class,
() -> clusterControl.registerBroker(
new BrokerRegistrationRequestData().
setBrokerId(1).
setClusterId(clusterControl.clusterId()).
setIncarnationId(Uuid.fromString("07OOcU7MQFeSmGAFPP2Zww")).
setLogDirs(Arrays.asList(Uuid.fromString("Vv1gzkM2QpuE-PPrIc6XEw"))).
setIsMigratingZkBroker(true).
setListeners(new BrokerRegistrationRequestData.ListenerCollection(Collections.singleton(
new BrokerRegistrationRequestData.Listener().
setName("PLAINTEXT").
setHost("example.com").
setPort(9092).
setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)).iterator())),
111,
new FinalizedControllerFeatures(Collections.emptyMap(), 100L))).getMessage());
}
}

0 comments on commit d393636

Please sign in to comment.