From cf152f0dc82cce21eb213fa6c73216c5f08c62e6 Mon Sep 17 00:00:00 2001 From: "Kuan-Po (Cooper) Tseng" Date: Fri, 24 May 2024 05:25:53 +0800 Subject: [PATCH] KAFKA-16828 RackAwareTaskAssignorTest failed (#16044) Reviewers: Chia-Ping Tsai --- .../assignment/RackAwareTaskAssignor.java | 25 +++---------------- 1 file changed, 3 insertions(+), 22 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java index b10b51f7f15b9..34c137ec1fb41 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java @@ -134,17 +134,7 @@ public synchronized boolean canEnableRackAwareAssignor() { } // Visible for testing. This method also checks if all TopicPartitions exist in cluster - public boolean populateTopicsToDescribe(final Set topicsToDescribe, - final boolean changelog) { - return populateTopicsToDescribe(fullMetadata, changelogPartitionsForTask, partitionsForTask, changelog, topicsToDescribe, racksForPartition); - } - - public static boolean populateTopicsToDescribe(final Cluster fullMetadata, - final Map> changelogPartitionsForTask, - final Map> partitionsForTask, - final boolean changelog, - final Set topicsToDescribe, - final Map> racksForPartition) { + boolean populateTopicsToDescribe(final Set topicsToDescribe, final boolean changelog) { if (changelog) { // Changelog topics are not in metadata, we need to describe them changelogPartitionsForTask.values().stream().flatMap(Collection::stream).forEach(tp -> topicsToDescribe.add(tp.topic())); @@ -177,10 +167,6 @@ public static boolean populateTopicsToDescribe(final Cluster fullMetadata, return true; } - private boolean validateTopicPartitionRack(final boolean changelogTopics) { - return validateTopicPartitionRack(fullMetadata, internalTopicManager, changelogPartitionsForTask, partitionsForTask, changelogTopics, racksForPartition); - } - /** * This function populates the {@param racksForPartition} parameter passed into the function by using both * the {@code Cluster} metadata as well as the {@param internalTopicManager} for topics that have stale @@ -188,15 +174,10 @@ private boolean validateTopicPartitionRack(final boolean changelogTopics) { * * @return whether the operation successfully completed and the rack information is valid. */ - public static boolean validateTopicPartitionRack(final Cluster fullMetadata, - final InternalTopicManager internalTopicManager, - final Map> changelogPartitionsForTask, - final Map> partitionsForTask, - final boolean changelogTopics, - final Map> racksForPartition) { + private boolean validateTopicPartitionRack(final boolean changelogTopics) { // Make sure rackId exist for all TopicPartitions needed final Set topicsToDescribe = new HashSet<>(); - if (!populateTopicsToDescribe(fullMetadata, changelogPartitionsForTask, partitionsForTask, changelogTopics, topicsToDescribe, racksForPartition)) { + if (!populateTopicsToDescribe(topicsToDescribe, changelogTopics)) { return false; }