Skip to content

Commit

Permalink
KAFKA-16828 RackAwareTaskAssignorTest failed (apache#16044)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
brandboat authored and chiacyu committed Jun 1, 2024
1 parent 8f78705 commit cf152f0
Showing 1 changed file with 3 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,7 @@ public synchronized boolean canEnableRackAwareAssignor() {
}

// Visible for testing. This method also checks if all TopicPartitions exist in cluster
public boolean populateTopicsToDescribe(final Set<String> topicsToDescribe,
final boolean changelog) {
return populateTopicsToDescribe(fullMetadata, changelogPartitionsForTask, partitionsForTask, changelog, topicsToDescribe, racksForPartition);
}

public static boolean populateTopicsToDescribe(final Cluster fullMetadata,
final Map<TaskId, Set<TopicPartition>> changelogPartitionsForTask,
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
final boolean changelog,
final Set<String> topicsToDescribe,
final Map<TopicPartition, Set<String>> racksForPartition) {
boolean populateTopicsToDescribe(final Set<String> topicsToDescribe, final boolean changelog) {
if (changelog) {
// Changelog topics are not in metadata, we need to describe them
changelogPartitionsForTask.values().stream().flatMap(Collection::stream).forEach(tp -> topicsToDescribe.add(tp.topic()));
Expand Down Expand Up @@ -177,26 +167,17 @@ public static boolean populateTopicsToDescribe(final Cluster fullMetadata,
return true;
}

private boolean validateTopicPartitionRack(final boolean changelogTopics) {
return validateTopicPartitionRack(fullMetadata, internalTopicManager, changelogPartitionsForTask, partitionsForTask, changelogTopics, racksForPartition);
}

/**
* This function populates the {@param racksForPartition} parameter passed into the function by using both
* the {@code Cluster} metadata as well as the {@param internalTopicManager} for topics that have stale
* information.
*
* @return whether the operation successfully completed and the rack information is valid.
*/
public static boolean validateTopicPartitionRack(final Cluster fullMetadata,
final InternalTopicManager internalTopicManager,
final Map<TaskId, Set<TopicPartition>> changelogPartitionsForTask,
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
final boolean changelogTopics,
final Map<TopicPartition, Set<String>> racksForPartition) {
private boolean validateTopicPartitionRack(final boolean changelogTopics) {
// Make sure rackId exist for all TopicPartitions needed
final Set<String> topicsToDescribe = new HashSet<>();
if (!populateTopicsToDescribe(fullMetadata, changelogPartitionsForTask, partitionsForTask, changelogTopics, topicsToDescribe, racksForPartition)) {
if (!populateTopicsToDescribe(topicsToDescribe, changelogTopics)) {
return false;
}

Expand Down

0 comments on commit cf152f0

Please sign in to comment.