Skip to content

Commit

Permalink
Deal protocol setting and pass all testcases
Browse files Browse the repository at this point in the history
  • Loading branch information
TaiJuWu committed Jun 8, 2024
1 parent b5856ff commit 3ced55a
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 11 deletions.
7 changes: 4 additions & 3 deletions core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,13 @@ private KafkaConfig createNodeConfig(TestKitNode node) {
controllerNode.metadataDirectory());
}
props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
props.put(SocketServerConfigs.LISTENERS_CONFIG, listeners(node.id()));
props.put(INTER_BROKER_LISTENER_NAME_CONFIG,
nodes.interBrokerListenerName().value());
props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
"CONTROLLER");
// props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "EXTERNAL,PLAINTEXT");
// Note: we can't accurately set controller.quorum.voters yet, since we don't
// yet know what ports each controller will pick. Set it to a dummy string
// for now as a placeholder.
Expand Down Expand Up @@ -320,12 +321,12 @@ public KafkaClusterTestKit build() throws Exception {

private String listeners(int node) {
if (nodes.isCombined(node)) {
return "EXTERNAL://localhost:0,CONTROLLER://localhost:0";
return "PLAINTEXT://localhost:0,CONTROLLER://localhost:0,EXTERNAL://localhost:0";
}
if (nodes.controllerNodes().containsKey(node)) {
return "CONTROLLER://localhost:0";
}
return "EXTERNAL://localhost:0";
return "PLAINTEXT://localhost:0,EXTERNAL://localhost:0";
}

private String roles(int node) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ private TopicCommand.TopicCommandOptions buildTopicCommandOptionsWithBootstrap(S
static List<ClusterConfig> generate1() {
Map<String, String> serverProp = new HashMap<>();
serverProp.put(REPLICA_FETCH_MAX_BYTES_CONFIG, "1"); // if config name error, no exception throw
// serverProp.put(INTER_BROKER_LISTENER_NAME_CONFIG, "PLAINTEXT");

Map<Integer, Map<String, String>> rackInfo = new HashMap<>();
Map<String, String> infoPerBroker1 = new HashMap<>();
Expand Down Expand Up @@ -1128,13 +1129,6 @@ public void testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress()
() -> "Reassignment didn't add the second node",
TestUtils.DEFAULT_MAX_WAIT_MS, 100L);


// FIXME: Fail on comnined and raft mode
// ensureConsistentKRaftMetadata()
// TestUtils.ensureConsistentKRaftMetadata(
// JavaConverters.asScalaIteratorConverter(clusterInstance.aliveBrokers().iterator()).asScala().toSeq(),
// scalaControllers.head(), "Timeout waiting for controller metadata propagating to brokers");

// describe the topic and test if it's under-replicated
String simpleDescribeOutput = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--topic", testTopicName));
String[] simpleDescribeOutputRows = simpleDescribeOutput.split(System.lineSeparator());
Expand Down Expand Up @@ -1267,7 +1261,7 @@ fullyReplicatedReplicaAssignmentMap, new Properties()
TestUtils.DEFAULT_MAX_WAIT_MS, 100L);

kafka.utils.TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L);
String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--under-min-isr-partitions"));
String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--under-min-isr-partitions", "--exclude-internal"));
String[] rows = output.split(System.lineSeparator());
assertTrue(rows[0].startsWith(String.format("Topic: %s", underMinIsrTopic)),
"Unexpected output: " + rows[0]);
Expand Down

0 comments on commit 3ced55a

Please sign in to comment.