diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 5a411400ca8a6..76a94fd5af8c4 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -1901,6 +1901,7 @@ private Assignment updateTargetAssignment( .withSubscriptionMetadata(subscriptionMetadata) .withSubscriptionType(subscriptionType) .withTargetAssignment(group.targetAssignment()) + .withInvertedTargetAssignment(group.invertedTargetAssignment()) .withTopicsImage(metadataImage.topics()) .addOrUpdateMember(updatedMember.memberId(), updatedMember); TargetAssignmentBuilder.TargetAssignmentResult assignmentResult; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java index f8d165e1bd124..d6f83df00572c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java @@ -107,8 +107,8 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu */ private final PartitionMovements partitionMovements; - public GeneralUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { - this.members = assignmentSpec.members(); + public GeneralUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) { + this.members = groupSpec.members(); this.subscribedTopicDescriber = subscribedTopicDescriber; this.subscribedTopicIds = new HashSet<>(); this.membersPerTopic = new HashMap<>(); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java new file mode 100644 index 0000000000000..296dedb52902b --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpec.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import java.util.Map; + +/** + * The group metadata specifications required to compute the target assignment. + */ +public interface GroupSpec { + /** + * @return Member metadata keyed by member Id. + */ + Map members(); + + /** + * @return The group's subscription type. + */ + SubscriptionType subscriptionType(); + + /** + * @return True, if the partition is currently assigned to a member. + * False, otherwise. + */ + boolean isPartitionAssigned(Uuid topicId, int partitionId); +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java similarity index 55% rename from group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java rename to group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java index ec65dc6decc7d..0194727c7dcf2 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImpl.java @@ -16,13 +16,15 @@ */ package org.apache.kafka.coordinator.group.assignor; +import org.apache.kafka.common.Uuid; + import java.util.Map; import java.util.Objects; /** * The assignment specification for a consumer group. */ -public class AssignmentSpec { +public class GroupSpecImpl implements GroupSpec { /** * The member metadata keyed by member Id. */ @@ -33,44 +35,76 @@ public class AssignmentSpec { */ private final SubscriptionType subscriptionType; - public AssignmentSpec( + /** + * Reverse lookup map representing topic partitions with + * their current member assignments. + */ + private final Map> invertedTargetAssignment; + + public GroupSpecImpl( Map members, - SubscriptionType subscriptionType + SubscriptionType subscriptionType, + Map> invertedTargetAssignment ) { Objects.requireNonNull(members); + Objects.requireNonNull(subscriptionType); + Objects.requireNonNull(invertedTargetAssignment); this.members = members; this.subscriptionType = subscriptionType; + this.invertedTargetAssignment = invertedTargetAssignment; } /** - * @return Member metadata keyed by member Id. + * {@inheritDoc} */ + @Override public Map members() { return members; } /** - * @return The group's subscription type. + * {@inheritDoc} */ + @Override public SubscriptionType subscriptionType() { return subscriptionType; } + /** + * {@inheritDoc} + */ + @Override + public boolean isPartitionAssigned(Uuid topicId, int partitionId) { + Map partitionMap = invertedTargetAssignment.get(topicId); + if (partitionMap == null) { + return false; + } + return partitionMap.containsKey(partitionId); + } + @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - AssignmentSpec that = (AssignmentSpec) o; + GroupSpecImpl that = (GroupSpecImpl) o; return subscriptionType == that.subscriptionType && - members.equals(that.members); + members.equals(that.members) && + invertedTargetAssignment.equals(that.invertedTargetAssignment); } @Override public int hashCode() { - return Objects.hash(members, subscriptionType); + int result = members.hashCode(); + result = 31 * result + subscriptionType.hashCode(); + result = 31 * result + invertedTargetAssignment.hashCode(); + return result; } + @Override public String toString() { - return "AssignmentSpec(members=" + members + ", subscriptionType=" + subscriptionType.toString() + ')'; + return "GroupSpecImpl(members=" + members + + ", subscriptionType=" + subscriptionType + + ", invertedTargetAssignment=" + invertedTargetAssignment + + ')'; } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java index a14363708e37c..3ea1361d6995a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java @@ -59,7 +59,7 @@ public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignment /** * The assignment specification which includes member metadata. */ - private final AssignmentSpec assignmentSpec; + private final GroupSpec groupSpec; /** * The topic and partition metadata describer. @@ -89,18 +89,19 @@ public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignment * The partitions that still need to be assigned. * Initially this contains all the subscribed topics' partitions. */ - private Set unassignedPartitions; + private final Set unassignedPartitions; /** * The target assignment. */ private final Map targetAssignment; - OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { - this.assignmentSpec = assignmentSpec; + OptimizedUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) { + this.groupSpec = groupSpec; this.subscribedTopicDescriber = subscribedTopicDescriber; - this.subscribedTopicIds = new HashSet<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds()); + this.subscribedTopicIds = new HashSet<>(groupSpec.members().values().iterator().next().subscribedTopicIds()); this.potentiallyUnfilledMembers = new HashMap<>(); + this.unassignedPartitions = new HashSet<>(); this.targetAssignment = new HashMap<>(); } @@ -108,9 +109,9 @@ public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignment * Here's the step-by-step breakdown of the assignment process: * *
  • Compute the quotas of partitions for each member based on the total partitions and member count.
  • - *
  • Initialize unassigned partitions to all the topic partitions and - * remove partitions from the list as and when they are assigned.
  • - *
  • For existing assignments, retain partitions based on the determined quota.
  • + *
  • Initialize unassigned partitions with all the topic partitions that aren't present in the + * current target assignment.
  • + *
  • For existing assignments, retain partitions based on the determined quota. Add extras to unassigned partitions.
  • *
  • Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions.
  • *
  • Proceed with a round-robin assignment according to quotas. * For each unassigned partition, locate the first compatible member from the potentially unfilled list.
  • @@ -124,6 +125,9 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { return new GroupAssignment(Collections.emptyMap()); } + // Check if the subscribed topicId is still valid. + // Update unassigned partitions based on the current target assignment + // and topic metadata. for (Uuid topicId : subscribedTopicIds) { int partitionCount = subscribedTopicDescriber.numPartitions(topicId); if (partitionCount == -1) { @@ -131,21 +135,25 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { "Members are subscribed to topic " + topicId + " which doesn't exist in the topic metadata." ); } else { + for (int i = 0; i < partitionCount; i++) { + if (!groupSpec.isPartitionAssigned(topicId, i)) { + unassignedPartitions.add(new TopicIdPartition(topicId, i)); + } + } totalPartitionsCount += partitionCount; } } // The minimum required quota that each member needs to meet for a balanced assignment. // This is the same for all members. - final int numberOfMembers = assignmentSpec.members().size(); + final int numberOfMembers = groupSpec.members().size(); final int minQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; - assignmentSpec.members().keySet().forEach(memberId -> + groupSpec.members().keySet().forEach(memberId -> targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) )); - unassignedPartitions = topicIdPartitions(subscribedTopicIds, subscribedTopicDescriber); potentiallyUnfilledMembers = assignStickyPartitions(minQuota); unassignedPartitionsRoundRobinAssignment(); @@ -179,7 +187,7 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { private Map assignStickyPartitions(int minQuota) { Map potentiallyUnfilledMembers = new HashMap<>(); - assignmentSpec.members().forEach((memberId, assignmentMemberSpec) -> { + groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { List validCurrentMemberAssignment = validCurrentMemberAssignment( assignmentMemberSpec.assignedPartitions() ); @@ -198,20 +206,28 @@ private Map assignStickyPartitions(int minQuota) { topicIdPartition.topicId(), topicIdPartition.partitionId() ); - unassignedPartitions.remove(topicIdPartition); }); - // The extra partition is located at the last index from the previous step. - if (remaining < 0 && remainingMembersToGetAnExtraPartition > 0) { - TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount); - addPartitionToAssignment( - targetAssignment, - memberId, - topicIdPartition.topicId(), - topicIdPartition.partitionId() - ); - unassignedPartitions.remove(topicIdPartition); - remainingMembersToGetAnExtraPartition--; + if (remaining < 0) { + // The extra partition is located at the last index from the previous step. + if (remainingMembersToGetAnExtraPartition > 0) { + TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); + addPartitionToAssignment( + targetAssignment, + memberId, + topicIdPartition.topicId(), + topicIdPartition.partitionId() + ); + remainingMembersToGetAnExtraPartition--; + } + // Any previously owned partitions that weren't retained due to the quotas + // are added to the unassigned partitions set. + if (retainedPartitionsCount < currentAssignmentSize) { + unassignedPartitions.addAll(validCurrentMemberAssignment.subList( + retainedPartitionsCount, + currentAssignmentSize + )); + } } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java index 27b38cc6c4be4..13b0ee30773be 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java @@ -34,12 +34,12 @@ public interface PartitionAssignor { /** * Assigns partitions to group members based on the given assignment specification and topic metadata. * - * @param assignmentSpec The assignment spec which includes member metadata. + * @param groupSpec The assignment spec which includes member metadata. * @param subscribedTopicDescriber The topic and partition metadata describer. * @return The new assignment for the group. */ GroupAssignment assign( - AssignmentSpec assignmentSpec, + GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber ) throws PartitionAssignorException; } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignorException.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignorException.java index 00fcd7d37e77a..482ad02f13aa5 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignorException.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignorException.java @@ -19,7 +19,7 @@ import org.apache.kafka.common.errors.ApiException; /** - * Exception thrown by {@link PartitionAssignor#assign(AssignmentSpec)}. The exception + * Exception thrown by {@link PartitionAssignor#assign(GroupSpec, SubscribedTopicDescriber)}}. The exception * is only used internally. */ public class PartitionAssignorException extends ApiException { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java index a1631f507a8ee..8393353a9ec8b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java @@ -81,20 +81,20 @@ public MemberWithRemainingAssignments(String memberId, int remaining) { * Returns a map of topic Ids to a list of members subscribed to them, * based on the given assignment specification and metadata. * - * @param assignmentSpec The specification for member assignments. + * @param groupSpec The specification for member assignments. * @param subscribedTopicDescriber The metadata describer for subscribed topics and clusters. * @return A map of topic Ids to a list of member Ids subscribed to them. * * @throws PartitionAssignorException If a member is subscribed to a non-existent topic. */ private Map> membersPerTopic( - final AssignmentSpec assignmentSpec, + final GroupSpec groupSpec, final SubscribedTopicDescriber subscribedTopicDescriber ) { Map> membersPerTopic = new HashMap<>(); - Map membersData = assignmentSpec.members(); + Map membersData = groupSpec.members(); - if (assignmentSpec.subscriptionType().equals(HOMOGENEOUS)) { + if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) { Set allMembers = membersData.keySet(); Collection topics = membersData.values().iterator().next().subscribedTopicIds(); @@ -139,7 +139,7 @@ private Map> membersPerTopic( */ @Override public GroupAssignment assign( - final AssignmentSpec assignmentSpec, + final GroupSpec groupSpec, final SubscribedTopicDescriber subscribedTopicDescriber ) throws PartitionAssignorException { @@ -147,7 +147,7 @@ public GroupAssignment assign( // Step 1 Map> membersPerTopic = membersPerTopic( - assignmentSpec, + groupSpec, subscribedTopicDescriber ); @@ -162,7 +162,7 @@ public GroupAssignment assign( List potentiallyUnfilledMembers = new ArrayList<>(); for (String memberId : membersForTopic) { - Set assignedPartitionsForTopic = assignmentSpec.members().get(memberId) + Set assignedPartitionsForTopic = groupSpec.members().get(memberId) .assignedPartitions().getOrDefault(topicId, Collections.emptySet()); int currentAssignmentSize = assignedPartitionsForTopic.size(); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java index aff9365dc70af..caa0de48c2be2 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java @@ -57,28 +57,28 @@ public String name() { * Perform the group assignment given the current members and * topics metadata. * - * @param assignmentSpec The assignment specification that included member metadata. + * @param groupSpec The assignment specification that included member metadata. * @param subscribedTopicDescriber The topic and cluster metadata describer {@link SubscribedTopicDescriber}. * @return The new target assignment for the group. */ @Override public GroupAssignment assign( - AssignmentSpec assignmentSpec, + GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber ) throws PartitionAssignorException { AbstractUniformAssignmentBuilder assignmentBuilder; - if (assignmentSpec.members().isEmpty()) + if (groupSpec.members().isEmpty()) return new GroupAssignment(Collections.emptyMap()); - if (assignmentSpec.subscriptionType().equals(HOMOGENEOUS)) { + if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) { LOG.debug("Detected that all members are subscribed to the same set of topics, invoking the " + "optimized assignment algorithm"); - assignmentBuilder = new OptimizedUniformAssignmentBuilder(assignmentSpec, subscribedTopicDescriber); + assignmentBuilder = new OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber); } else { LOG.debug("Detected that the members are subscribed to different sets of topics, invoking the " + "general assignment algorithm"); - assignmentBuilder = new GeneralUniformAssignmentBuilder(assignmentSpec, subscribedTopicDescriber); + assignmentBuilder = new GeneralUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber); } return assignmentBuilder.buildAssignment(); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java index dd3a6f2e7bfd1..007971a95dd24 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java @@ -170,6 +170,12 @@ public static class DeadlineAndEpoch { */ private final TimelineHashMap targetAssignment; + /** + * Reverse lookup map representing topic partitions with + * their current member assignments. + */ + private final TimelineHashMap> invertedTargetAssignment; + /** * The current partition epoch maps each topic-partitions to their current epoch where * the epoch is the epoch of their owners. When a member revokes a partition, it removes @@ -221,6 +227,7 @@ public ConsumerGroup( this.subscriptionType = new TimelineObject<>(snapshotRegistry, HOMOGENEOUS); this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry); this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0); + this.invertedTargetAssignment = new TimelineHashMap<>(snapshotRegistry, 0); this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0); this.metrics = Objects.requireNonNull(metrics); this.numClassicProtocolMembers = new TimelineInteger(snapshotRegistry); @@ -517,21 +524,89 @@ public Assignment targetAssignment(String memberId) { } /** - * Updates target assignment of a member. + * @return An immutable map containing all the topic partitions + * with their current member assignments. + */ + public Map> invertedTargetAssignment() { + return Collections.unmodifiableMap(invertedTargetAssignment); + } + + /** + * Updates the target assignment of a member. * * @param memberId The member id. * @param newTargetAssignment The new target assignment. */ public void updateTargetAssignment(String memberId, Assignment newTargetAssignment) { + updateInvertedTargetAssignment( + memberId, + targetAssignment.getOrDefault(memberId, new Assignment(Collections.emptyMap())), + newTargetAssignment + ); targetAssignment.put(memberId, newTargetAssignment); } + /** + * Updates the reverse lookup map of the target assignment. + * + * @param memberId The member Id. + * @param oldTargetAssignment The old target assignment. + * @param newTargetAssignment The new target assignment. + */ + private void updateInvertedTargetAssignment( + String memberId, + Assignment oldTargetAssignment, + Assignment newTargetAssignment + ) { + // Combine keys from both old and new assignments. + Set allTopicIds = new HashSet<>(); + allTopicIds.addAll(oldTargetAssignment.partitions().keySet()); + allTopicIds.addAll(newTargetAssignment.partitions().keySet()); + + for (Uuid topicId : allTopicIds) { + Set oldPartitions = oldTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet()); + Set newPartitions = newTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet()); + + TimelineHashMap topicPartitionAssignment = invertedTargetAssignment.computeIfAbsent( + topicId, k -> new TimelineHashMap<>(snapshotRegistry, Math.max(oldPartitions.size(), newPartitions.size())) + ); + + // Remove partitions that aren't present in the new assignment only if the partition is currently + // still assigned to the member in question. + // If p0 was moved from A to B, and the target assignment map was updated for B first, we don't want to + // remove the key p0 from the inverted map and undo the action when A eventually tries to update its assignment. + for (Integer partition : oldPartitions) { + if (!newPartitions.contains(partition) && memberId.equals(topicPartitionAssignment.get(partition))) { + topicPartitionAssignment.remove(partition); + } + } + + // Add partitions that are in the new assignment but not in the old assignment. + for (Integer partition : newPartitions) { + if (!oldPartitions.contains(partition)) { + topicPartitionAssignment.put(partition, memberId); + } + } + + if (topicPartitionAssignment.isEmpty()) { + invertedTargetAssignment.remove(topicId); + } else { + invertedTargetAssignment.put(topicId, topicPartitionAssignment); + } + } + } + /** * Removes the target assignment of a member. * * @param memberId The member id. */ public void removeTargetAssignment(String memberId) { + updateInvertedTargetAssignment( + memberId, + targetAssignment.getOrDefault(memberId, Assignment.EMPTY), + Assignment.EMPTY + ); targetAssignment.remove(memberId); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java index 09a44b17c834b..57d6039fa0ba8 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java @@ -19,7 +19,7 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.coordinator.group.CoordinatorRecord; import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; -import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl; import org.apache.kafka.coordinator.group.assignor.SubscriptionType; import org.apache.kafka.coordinator.group.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.assignor.MemberAssignment; @@ -126,6 +126,12 @@ public Map targetAssignment() { */ private Map targetAssignment = Collections.emptyMap(); + /** + * Reverse lookup map representing topic partitions with + * their current member assignments. + */ + private Map> invertedTargetAssignment = Collections.emptyMap(); + /** * The topics image. */ @@ -224,6 +230,19 @@ public TargetAssignmentBuilder withTargetAssignment( return this; } + /** + * Adds the existing topic partition assignments. + * + * @param invertedTargetAssignment The reverse lookup map of the current target assignment. + * @return This object. + */ + public TargetAssignmentBuilder withInvertedTargetAssignment( + Map> invertedTargetAssignment + ) { + this.invertedTargetAssignment = invertedTargetAssignment; + return this; + } + /** * Adds the topics image. * @@ -317,7 +336,11 @@ public TargetAssignmentResult build() throws PartitionAssignorException { // Compute the assignment. GroupAssignment newGroupAssignment = assignor.assign( - new AssignmentSpec(Collections.unmodifiableMap(memberSpecs), subscriptionType), + new GroupSpecImpl( + Collections.unmodifiableMap(memberSpecs), + subscriptionType, + invertedTargetAssignment + ), new SubscribedTopicMetadata(topicMetadataMap) ); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java index 8b6a09b2ed84f..74bb303abd03e 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java @@ -17,6 +17,7 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; import org.apache.kafka.coordinator.group.assignor.GroupAssignment; import java.util.AbstractMap; @@ -82,4 +83,33 @@ public static void assertAssignment( assertEquals(expectedAssignment.get(memberId), computedAssignmentForMember); }); } + + /** + * Generate a reverse look up map of partition to member target assignments from the given member spec. + * + * @param memberSpec A map where the key is the member Id and the value is an + * AssignmentMemberSpec object containing the member's partition assignments. + * @return Map of topic partition to member assignments. + */ + public static Map> invertedTargetAssignment( + Map memberSpec + ) { + Map> invertedTargetAssignment = new HashMap<>(); + for (Map.Entry memberEntry : memberSpec.entrySet()) { + String memberId = memberEntry.getKey(); + Map> topicsAndPartitions = memberEntry.getValue().assignedPartitions(); + + for (Map.Entry> topicEntry : topicsAndPartitions.entrySet()) { + Uuid topicId = topicEntry.getKey(); + Set partitions = topicEntry.getValue(); + + Map partitionMap = invertedTargetAssignment.computeIfAbsent(topicId, k -> new HashMap<>()); + + for (Integer partitionId : partitions) { + partitionMap.put(partitionId, memberId); + } + } + } + return invertedTargetAssignment; + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockPartitionAssignor.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockPartitionAssignor.java index 736ee4395a58b..1fdbb31125c17 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockPartitionAssignor.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockPartitionAssignor.java @@ -17,7 +17,7 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.common.Uuid; -import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupSpec; import org.apache.kafka.coordinator.group.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; @@ -45,7 +45,7 @@ public String name() { } @Override - public GroupAssignment assign(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) throws PartitionAssignorException { + public GroupAssignment assign(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) throws PartitionAssignorException { return prepareGroupAssignment; } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NoOpPartitionAssignor.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NoOpPartitionAssignor.java index 05b8cdc4c76c6..cf15ee58654dd 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NoOpPartitionAssignor.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NoOpPartitionAssignor.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.coordinator.group; -import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupSpec; import org.apache.kafka.coordinator.group.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.assignor.MemberAssignment; import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; @@ -34,8 +34,8 @@ public String name() { } @Override - public GroupAssignment assign(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { - return new GroupAssignment(assignmentSpec.members().entrySet() + public GroupAssignment assign(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) { + return new GroupAssignment(groupSpec.members().entrySet() .stream() .collect(Collectors.toMap( Map.Entry::getKey, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java index 32f9618ff92d5..f5a7be24bf2a9 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java @@ -32,6 +32,7 @@ import static org.apache.kafka.coordinator.group.AssignmentTestUtil.assertAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment; import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks; import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HETEROGENEOUS; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -79,8 +80,16 @@ public void testTwoMembersNoTopicSubscription() { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); - GroupAssignment groupAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HETEROGENEOUS, + Collections.emptyMap() + ); + + GroupAssignment groupAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); assertEquals(Collections.emptyMap(), groupAssignment.members()); } @@ -113,10 +122,14 @@ public void testTwoMembersSubscribedToNonexistentTopics() { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HETEROGENEOUS, + Collections.emptyMap() + ); assertThrows(PartitionAssignorException.class, - () -> assignor.assign(assignmentSpec, subscribedTopicMetadata)); + () -> assignor.assign(groupSpec, subscribedTopicMetadata)); } @Test @@ -149,10 +162,17 @@ public void testFirstAssignmentTwoMembersTwoTopics() { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HETEROGENEOUS, + Collections.emptyMap() + ); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); Map>> expectedAssignment = new HashMap<>(); expectedAssignment.put(memberA, mkAssignment( @@ -202,10 +222,17 @@ public void testFirstAssignmentNumMembersGreaterThanTotalNumPartitions() { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HETEROGENEOUS, + Collections.emptyMap() + ); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); // Topic 3 has 2 partitions but three members subscribed to it - one of them should not get an assignment. Map>> expectedAssignment = new HashMap<>(); @@ -285,10 +312,17 @@ public void testReassignmentForTwoMembersThreeTopicsGivenUnbalancedPrevAssignmen currentAssignmentForC )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HETEROGENEOUS, + invertedTargetAssignment(members) + ); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); Map>> expectedAssignment = new HashMap<>(); expectedAssignment.put(memberA, mkAssignment( @@ -362,10 +396,17 @@ public void testReassignmentWhenPartitionsAreAddedForTwoMembers() { currentAssignmentForB )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HETEROGENEOUS, + invertedTargetAssignment(members) + ); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); Map>> expectedAssignment = new HashMap<>(); expectedAssignment.put(memberA, mkAssignment( @@ -429,10 +470,17 @@ public void testReassignmentWhenOneMemberAddedAndPartitionsAddedTwoMembersTwoTop Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HETEROGENEOUS, + invertedTargetAssignment(members) + ); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); Map>> expectedAssignment = new HashMap<>(); expectedAssignment.put(memberA, mkAssignment( @@ -496,10 +544,17 @@ public void testReassignmentWhenOneMemberRemovedAfterInitialAssignmentWithThreeM // Member C was removed - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HETEROGENEOUS, + invertedTargetAssignment(members) + ); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); Map>> expectedAssignment = new HashMap<>(); expectedAssignment.put(memberA, mkAssignment( @@ -554,10 +609,17 @@ public void testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith currentAssignmentForB )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HETEROGENEOUS, + invertedTargetAssignment(members) + ); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); Map>> expectedAssignment = new HashMap<>(); expectedAssignment.put(memberA, mkAssignment( diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImplTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImplTest.java new file mode 100644 index 0000000000000..4060b1a4f1b0d --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GroupSpecImplTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +public class GroupSpecImplTest { + + private Map members; + private SubscriptionType subscriptionType; + private Map> invertedTargetAssignment; + private GroupSpecImpl groupSpec; + private Uuid topicId; + + @BeforeEach + void setUp() { + members = new HashMap<>(); + + subscriptionType = SubscriptionType.HOMOGENEOUS; + invertedTargetAssignment = new HashMap<>(); + topicId = Uuid.randomUuid(); + + members.put("test-member", new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + new HashSet<>(Collections.singletonList(topicId)), + Collections.emptyMap()) + ); + + groupSpec = new GroupSpecImpl( + members, + subscriptionType, + invertedTargetAssignment + ); + } + + @Test + void testMembers() { + assertEquals(members, groupSpec.members()); + } + + @Test + void testSubscriptionType() { + assertEquals(subscriptionType, groupSpec.subscriptionType()); + } + + @Test + void testIsPartitionAssigned() { + Map partitionMap = new HashMap<>(); + partitionMap.put(1, "test-member"); + invertedTargetAssignment.put(topicId, partitionMap); + + assertTrue(groupSpec.isPartitionAssigned(topicId, 1)); + assertFalse(groupSpec.isPartitionAssigned(topicId, 2)); + assertFalse(groupSpec.isPartitionAssigned(Uuid.randomUuid(), 2)); + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java index 4fe748c3c03e0..f21bd63735fb1 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java @@ -35,6 +35,7 @@ import static org.apache.kafka.coordinator.group.AssignmentTestUtil.assertAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment; import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks; import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -77,8 +78,16 @@ public void testOneMemberNoTopicSubscription() { ) ); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); - GroupAssignment groupAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HOMOGENEOUS, + Collections.emptyMap() + ); + + GroupAssignment groupAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); assertEquals(Collections.emptyMap(), groupAssignment.members()); } @@ -107,10 +116,14 @@ public void testOneMemberSubscribedToNonexistentTopic() { ) ); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HOMOGENEOUS, + Collections.emptyMap() + ); assertThrows(PartitionAssignorException.class, - () -> assignor.assign(assignmentSpec, subscribedTopicMetadata)); + () -> assignor.assign(groupSpec, subscribedTopicMetadata)); } @Test @@ -143,11 +156,6 @@ public void testFirstAssignmentTwoMembersTwoTopicsNoMemberRacks() { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); - Map>> expectedAssignment = new HashMap<>(); expectedAssignment.put(memberA, mkAssignment( mkTopicAssignment(topic1Uuid, 0, 2), @@ -158,6 +166,18 @@ public void testFirstAssignmentTwoMembersTwoTopicsNoMemberRacks() { mkTopicAssignment(topic3Uuid, 0) )); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HOMOGENEOUS, + Collections.emptyMap() + ); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); + assertAssignment(expectedAssignment, computedAssignment); checkValidityAndBalance(members, computedAssignment); } @@ -192,11 +212,6 @@ public void testFirstAssignmentNumMembersGreaterThanTotalNumPartitions() { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); - // Topic 3 has 2 partitions but three members subscribed to it - one of them should not get an assignment. Map>> expectedAssignment = new HashMap<>(); expectedAssignment.put(memberA, mkAssignment( @@ -209,6 +224,18 @@ public void testFirstAssignmentNumMembersGreaterThanTotalNumPartitions() { Collections.emptyMap() ); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HOMOGENEOUS, + Collections.emptyMap() + ); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); + assertAssignment(expectedAssignment, computedAssignment); checkValidityAndBalance(members, computedAssignment); } @@ -236,10 +263,17 @@ public void testValidityAndBalanceForLargeSampleSet() { )); } - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HOMOGENEOUS, + Collections.emptyMap() + ); SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); checkValidityAndBalance(members, computedAssignment); } @@ -261,7 +295,6 @@ public void testReassignmentForTwoMembersTwoTopicsGivenUnbalancedPrevAssignment( )); Map members = new TreeMap<>(); - Map> currentAssignmentForA = new TreeMap<>( mkAssignment( mkTopicAssignment(topic1Uuid, 0, 1), @@ -288,11 +321,6 @@ public void testReassignmentForTwoMembersTwoTopicsGivenUnbalancedPrevAssignment( currentAssignmentForB )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); - Map>> expectedAssignment = new HashMap<>(); expectedAssignment.put(memberA, mkAssignment( mkTopicAssignment(topic1Uuid, 0, 1), @@ -303,6 +331,18 @@ public void testReassignmentForTwoMembersTwoTopicsGivenUnbalancedPrevAssignment( mkTopicAssignment(topic2Uuid, 1, 2) )); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HOMOGENEOUS, + invertedTargetAssignment(members) + ); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); + assertAssignment(expectedAssignment, computedAssignment); checkValidityAndBalance(members, computedAssignment); } @@ -352,11 +392,6 @@ public void testReassignmentWhenPartitionsAreAddedForTwoMembersTwoTopics() { currentAssignmentForB )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); - Map>> expectedAssignment = new HashMap<>(); expectedAssignment.put(memberA, mkAssignment( mkTopicAssignment(topic1Uuid, 0, 2, 3, 5), @@ -367,6 +402,18 @@ public void testReassignmentWhenPartitionsAreAddedForTwoMembersTwoTopics() { mkTopicAssignment(topic2Uuid, 1, 2, 3) )); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HOMOGENEOUS, + invertedTargetAssignment(members) + ); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); + assertAssignment(expectedAssignment, computedAssignment); checkValidityAndBalance(members, computedAssignment); } @@ -419,11 +466,6 @@ public void testReassignmentWhenOneMemberAddedAfterInitialAssignmentWithTwoMembe Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); - Map>> expectedAssignment = new HashMap<>(); expectedAssignment.put(memberA, mkAssignment( mkTopicAssignment(topic1Uuid, 0, 2) @@ -436,6 +478,18 @@ public void testReassignmentWhenOneMemberAddedAfterInitialAssignmentWithTwoMembe mkTopicAssignment(topic2Uuid, 0, 2) )); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HOMOGENEOUS, + invertedTargetAssignment(members) + ); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); + assertAssignment(expectedAssignment, computedAssignment); checkValidityAndBalance(members, computedAssignment); } @@ -482,11 +536,6 @@ public void testReassignmentWhenOneMemberRemovedAfterInitialAssignmentWithThreeM // Member C was removed - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); - Map>> expectedAssignment = new HashMap<>(); expectedAssignment.put(memberA, mkAssignment( mkTopicAssignment(topic1Uuid, 0, 2), @@ -497,6 +546,18 @@ public void testReassignmentWhenOneMemberRemovedAfterInitialAssignmentWithThreeM mkTopicAssignment(topic2Uuid, 1, 2) )); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HOMOGENEOUS, + invertedTargetAssignment(members) + ); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); + assertAssignment(expectedAssignment, computedAssignment); checkValidityAndBalance(members, computedAssignment); } @@ -542,11 +603,6 @@ public void testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith currentAssignmentForB )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); - Map>> expectedAssignment = new HashMap<>(); expectedAssignment.put(memberA, mkAssignment( mkTopicAssignment(topic2Uuid, 0) @@ -555,6 +611,18 @@ public void testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith mkTopicAssignment(topic2Uuid, 1) )); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HOMOGENEOUS, + invertedTargetAssignment(members) + ); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); + assertAssignment(expectedAssignment, computedAssignment); checkValidityAndBalance(members, computedAssignment); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java index b52681da48031..7acb9bd5f750c 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java @@ -32,6 +32,7 @@ import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment; import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HETEROGENEOUS; import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -74,8 +75,16 @@ public void testOneConsumerNoTopic() { ) ); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); - GroupAssignment groupAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HOMOGENEOUS, + Collections.emptyMap() + ); + + GroupAssignment groupAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); assertEquals(Collections.emptyMap(), groupAssignment.members()); } @@ -104,10 +113,14 @@ public void testOneConsumerSubscribedToNonExistentTopic() { ) ); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HOMOGENEOUS, + Collections.emptyMap() + ); assertThrows(PartitionAssignorException.class, - () -> assignor.assign(assignmentSpec, subscribedTopicMetadata)); + () -> assignor.assign(groupSpec, subscribedTopicMetadata)); } @Test @@ -126,8 +139,6 @@ public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { createPartitionRacks(2) )); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - Map members = new TreeMap<>(); members.put(consumerA, new AssignmentMemberSpec( @@ -144,16 +155,23 @@ public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HOMOGENEOUS, + Collections.emptyMap() + ); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - Map>> expectedAssignment = new HashMap<>(); + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); + Map>> expectedAssignment = new HashMap<>(); expectedAssignment.put(consumerA, mkAssignment( mkTopicAssignment(topic1Uuid, 0, 1), mkTopicAssignment(topic3Uuid, 0) )); - expectedAssignment.put(consumerB, mkAssignment( mkTopicAssignment(topic1Uuid, 2), mkTopicAssignment(topic3Uuid, 1) @@ -184,8 +202,6 @@ public void testFirstAssignmentThreeConsumersThreeTopicsDifferentSubscriptions() createPartitionRacks(2) )); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - Map members = new TreeMap<>(); members.put(consumerA, new AssignmentMemberSpec( @@ -209,20 +225,26 @@ public void testFirstAssignmentThreeConsumersThreeTopicsDifferentSubscriptions() Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HETEROGENEOUS, + Collections.emptyMap() + ); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); Map>> expectedAssignment = new HashMap<>(); - expectedAssignment.put(consumerA, mkAssignment( mkTopicAssignment(topic1Uuid, 0, 1, 2), mkTopicAssignment(topic2Uuid, 0, 1) )); - expectedAssignment.put(consumerB, mkAssignment( mkTopicAssignment(topic3Uuid, 0) )); - expectedAssignment.put(consumerC, mkAssignment( mkTopicAssignment(topic2Uuid, 2), mkTopicAssignment(topic3Uuid, 1) @@ -247,8 +269,6 @@ public void testFirstAssignmentNumConsumersGreaterThanNumPartitions() { createPartitionRacks(2) )); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - Map members = new TreeMap<>(); members.put(consumerA, new AssignmentMemberSpec( @@ -272,8 +292,17 @@ public void testFirstAssignmentNumConsumersGreaterThanNumPartitions() { Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HOMOGENEOUS, + Collections.emptyMap() + ); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); Map>> expectedAssignment = new HashMap<>(); // Topic 3 has 2 partitions but three consumers subscribed to it - one of them will not get a partition. @@ -281,12 +310,10 @@ public void testFirstAssignmentNumConsumersGreaterThanNumPartitions() { mkTopicAssignment(topic1Uuid, 0), mkTopicAssignment(topic3Uuid, 0) )); - expectedAssignment.put(consumerB, mkAssignment( mkTopicAssignment(topic1Uuid, 1), mkTopicAssignment(topic3Uuid, 1) )); - expectedAssignment.put(consumerC, mkAssignment( mkTopicAssignment(topic1Uuid, 2) )); @@ -310,15 +337,12 @@ public void testReassignmentNumConsumersGreaterThanNumPartitionsWhenOneConsumerA createPartitionRacks(2) )); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - Map members = new TreeMap<>(); Map> currentAssignmentForA = mkAssignment( mkTopicAssignment(topic1Uuid, 0), mkTopicAssignment(topic2Uuid, 0) ); - members.put(consumerA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), @@ -330,7 +354,6 @@ public void testReassignmentNumConsumersGreaterThanNumPartitionsWhenOneConsumerA mkTopicAssignment(topic1Uuid, 1), mkTopicAssignment(topic2Uuid, 1) ); - members.put(consumerB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), @@ -346,16 +369,23 @@ public void testReassignmentNumConsumersGreaterThanNumPartitionsWhenOneConsumerA Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HOMOGENEOUS, + invertedTargetAssignment(members) + ); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - Map>> expectedAssignment = new HashMap<>(); + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); + Map>> expectedAssignment = new HashMap<>(); expectedAssignment.put(consumerA, mkAssignment( mkTopicAssignment(topic1Uuid, 0), mkTopicAssignment(topic2Uuid, 0) )); - expectedAssignment.put(consumerB, mkAssignment( mkTopicAssignment(topic1Uuid, 1), mkTopicAssignment(topic2Uuid, 1) @@ -383,15 +413,12 @@ public void testReassignmentWhenOnePartitionAddedForTwoConsumersTwoTopics() { createPartitionRacks(4) )); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - Map members = new TreeMap<>(); Map> currentAssignmentForA = mkAssignment( mkTopicAssignment(topic1Uuid, 0, 1), mkTopicAssignment(topic2Uuid, 0, 1) ); - members.put(consumerA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), @@ -403,7 +430,6 @@ public void testReassignmentWhenOnePartitionAddedForTwoConsumersTwoTopics() { mkTopicAssignment(topic1Uuid, 2), mkTopicAssignment(topic2Uuid, 2) ); - members.put(consumerB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), @@ -411,16 +437,23 @@ public void testReassignmentWhenOnePartitionAddedForTwoConsumersTwoTopics() { currentAssignmentForB )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HOMOGENEOUS, + invertedTargetAssignment(members) + ); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - Map>> expectedAssignment = new HashMap<>(); + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); + Map>> expectedAssignment = new HashMap<>(); expectedAssignment.put(consumerA, mkAssignment( mkTopicAssignment(topic1Uuid, 0, 1), mkTopicAssignment(topic2Uuid, 0, 1) )); - expectedAssignment.put(consumerB, mkAssignment( mkTopicAssignment(topic1Uuid, 2, 3), mkTopicAssignment(topic2Uuid, 2, 3) @@ -445,15 +478,12 @@ public void testReassignmentWhenOneConsumerAddedAfterInitialAssignmentWithTwoCon createPartitionRacks(3) )); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - Map members = new TreeMap<>(); Map> currentAssignmentForA = mkAssignment( mkTopicAssignment(topic1Uuid, 0, 1), mkTopicAssignment(topic2Uuid, 0, 1) ); - members.put(consumerA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), @@ -465,7 +495,6 @@ public void testReassignmentWhenOneConsumerAddedAfterInitialAssignmentWithTwoCon mkTopicAssignment(topic1Uuid, 2), mkTopicAssignment(topic2Uuid, 2) ); - members.put(consumerB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), @@ -481,21 +510,27 @@ public void testReassignmentWhenOneConsumerAddedAfterInitialAssignmentWithTwoCon Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HOMOGENEOUS, + invertedTargetAssignment(members) + ); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); Map>> expectedAssignment = new HashMap<>(); - expectedAssignment.put(consumerA, mkAssignment( mkTopicAssignment(topic1Uuid, 0), mkTopicAssignment(topic2Uuid, 0) )); - expectedAssignment.put(consumerB, mkAssignment( mkTopicAssignment(topic1Uuid, 2), mkTopicAssignment(topic2Uuid, 2) )); - expectedAssignment.put(consumerC, mkAssignment( mkTopicAssignment(topic1Uuid, 1), mkTopicAssignment(topic2Uuid, 1) @@ -521,15 +556,12 @@ public void testReassignmentWhenOneConsumerAddedAndOnePartitionAfterInitialAssig createPartitionRacks(3) )); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - Map members = new TreeMap<>(); Map> currentAssignmentForA = mkAssignment( mkTopicAssignment(topic1Uuid, 0, 1), mkTopicAssignment(topic2Uuid, 0, 1) ); - members.put(consumerA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), @@ -541,7 +573,6 @@ public void testReassignmentWhenOneConsumerAddedAndOnePartitionAfterInitialAssig mkTopicAssignment(topic1Uuid, 2), mkTopicAssignment(topic2Uuid, 2) ); - members.put(consumerB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), @@ -557,21 +588,27 @@ public void testReassignmentWhenOneConsumerAddedAndOnePartitionAfterInitialAssig Collections.emptyMap() )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HETEROGENEOUS, + invertedTargetAssignment(members) + ); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - Map>> expectedAssignment = new HashMap<>(); + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); + Map>> expectedAssignment = new HashMap<>(); expectedAssignment.put(consumerA, mkAssignment( mkTopicAssignment(topic1Uuid, 0, 1), mkTopicAssignment(topic2Uuid, 0, 1) )); - expectedAssignment.put(consumerB, mkAssignment( mkTopicAssignment(topic1Uuid, 2), mkTopicAssignment(topic2Uuid, 2) )); - expectedAssignment.put(consumerC, mkAssignment( mkTopicAssignment(topic1Uuid, 3) )); @@ -595,8 +632,6 @@ public void testReassignmentWhenOneConsumerRemovedAfterInitialAssignmentWithTwoC createPartitionRacks(3) )); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - Map members = new TreeMap<>(); // Consumer A was removed @@ -604,7 +639,6 @@ public void testReassignmentWhenOneConsumerRemovedAfterInitialAssignmentWithTwoC mkTopicAssignment(topic1Uuid, 2), mkTopicAssignment(topic2Uuid, 2) ); - members.put(consumerB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), @@ -612,11 +646,19 @@ public void testReassignmentWhenOneConsumerRemovedAfterInitialAssignmentWithTwoC currentAssignmentForB )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HOMOGENEOUS, + invertedTargetAssignment(members) + ); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); Map>> expectedAssignment = new HashMap<>(); - expectedAssignment.put(consumerB, mkAssignment( mkTopicAssignment(topic1Uuid, 0, 1, 2), mkTopicAssignment(topic2Uuid, 0, 1, 2) @@ -647,18 +689,14 @@ public void testReassignmentWhenMultipleSubscriptionsRemovedAfterInitialAssignme createPartitionRacks(2) )); - SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); - - Map members = new TreeMap<>(); - // Let initial subscriptions be A -> T1, T2 // B -> T2 // C -> T2, T3 // Change the subscriptions to A -> T1 // B -> T1, T2, T3 // C -> T2 + Map members = new TreeMap<>(); Map> currentAssignmentForA = mkAssignment( mkTopicAssignment(topic1Uuid, 0, 1, 2), mkTopicAssignment(topic2Uuid, 0) ); - members.put(consumerA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), @@ -669,7 +707,6 @@ public void testReassignmentWhenMultipleSubscriptionsRemovedAfterInitialAssignme Map> currentAssignmentForB = mkAssignment( mkTopicAssignment(topic2Uuid, 1) ); - members.put(consumerB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), @@ -681,7 +718,6 @@ public void testReassignmentWhenMultipleSubscriptionsRemovedAfterInitialAssignme mkTopicAssignment(topic2Uuid, 2), mkTopicAssignment(topic3Uuid, 0, 1) ); - members.put(consumerC, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), @@ -689,21 +725,27 @@ public void testReassignmentWhenMultipleSubscriptionsRemovedAfterInitialAssignme currentAssignmentForC )); - AssignmentSpec assignmentSpec = new AssignmentSpec(members, HETEROGENEOUS); - GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + GroupSpec groupSpec = new GroupSpecImpl( + members, + HETEROGENEOUS, + invertedTargetAssignment(members) + ); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); Map>> expectedAssignment = new HashMap<>(); - expectedAssignment.put(consumerA, mkAssignment( mkTopicAssignment(topic1Uuid, 0, 1) )); - expectedAssignment.put(consumerB, mkAssignment( mkTopicAssignment(topic1Uuid, 2), mkTopicAssignment(topic2Uuid, 0, 1), mkTopicAssignment(topic3Uuid, 0, 1) )); - expectedAssignment.put(consumerC, mkAssignment( mkTopicAssignment(topic2Uuid, 2) )); @@ -711,7 +753,10 @@ public void testReassignmentWhenMultipleSubscriptionsRemovedAfterInitialAssignme assertAssignment(expectedAssignment, computedAssignment); } - private void assertAssignment(Map>> expectedAssignment, GroupAssignment computedGroupAssignment) { + private void assertAssignment( + Map>> expectedAssignment, + GroupAssignment computedGroupAssignment + ) { assertEquals(expectedAssignment.size(), computedGroupAssignment.members().size()); for (String memberId : computedGroupAssignment.members().keySet()) { Map> computedAssignmentForMember = computedGroupAssignment.members().get(memberId).targetPartitions(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java index 4ae14c25439b7..784bb6077346b 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java @@ -834,6 +834,90 @@ public void testUpdateSubscribedTopicNamesAndSubscriptionType() { ); } + @Test + public void testUpdateInvertedAssignment() { + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + GroupCoordinatorMetricsShard metricsShard = mock(GroupCoordinatorMetricsShard.class); + ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, "test-group", metricsShard); + Uuid topicId = Uuid.randomUuid(); + String memberId1 = "member1"; + String memberId2 = "member2"; + + // Initial assignment for member1 + Assignment initialAssignment = new Assignment(Collections.singletonMap( + topicId, + new HashSet<>(Collections.singletonList(0)) + )); + consumerGroup.updateTargetAssignment(memberId1, initialAssignment); + + // Verify that partition 0 is assigned to member1. + assertEquals( + mkMap( + mkEntry(topicId, mkMap(mkEntry(0, memberId1))) + ), + consumerGroup.invertedTargetAssignment() + ); + + // New assignment for member1 + Assignment newAssignment = new Assignment(Collections.singletonMap( + topicId, + new HashSet<>(Collections.singletonList(1)) + )); + consumerGroup.updateTargetAssignment(memberId1, newAssignment); + + // Verify that partition 0 is no longer assigned and partition 1 is assigned to member1 + assertEquals( + mkMap( + mkEntry(topicId, mkMap(mkEntry(1, memberId1))) + ), + consumerGroup.invertedTargetAssignment() + ); + + // New assignment for member2 to add partition 1 + Assignment newAssignment2 = new Assignment(Collections.singletonMap( + topicId, + new HashSet<>(Collections.singletonList(1)) + )); + consumerGroup.updateTargetAssignment(memberId2, newAssignment2); + + // Verify that partition 1 is assigned to member2 + assertEquals( + mkMap( + mkEntry(topicId, mkMap(mkEntry(1, memberId2))) + ), + consumerGroup.invertedTargetAssignment() + ); + + // New assignment for member1 to revoke partition 1 and assign partition 0 + Assignment newAssignment1 = new Assignment(Collections.singletonMap( + topicId, + new HashSet<>(Collections.singletonList(0)) + )); + consumerGroup.updateTargetAssignment(memberId1, newAssignment1); + + // Verify that partition 1 is still assigned to member2 and partition 0 is assigned to member1 + assertEquals( + mkMap( + mkEntry(topicId, mkMap( + mkEntry(0, memberId1), + mkEntry(1, memberId2) + )) + ), + consumerGroup.invertedTargetAssignment() + ); + + // Test remove target assignment for member1 + consumerGroup.removeTargetAssignment(memberId1); + + // Verify that partition 0 is no longer assigned and partition 1 is still assigned to member2 + assertEquals( + mkMap( + mkEntry(topicId, mkMap(mkEntry(1, memberId2))) + ), + consumerGroup.invertedTargetAssignment() + ); + } + @Test public void testMetadataRefreshDeadline() { MockTime time = new MockTime(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java index 3a03a16228a2b..d5ba038f31895 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java @@ -17,9 +17,10 @@ package org.apache.kafka.coordinator.group.consumer; import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.AssignmentTestUtil; import org.apache.kafka.coordinator.group.MetadataImageBuilder; import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; -import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl; import org.apache.kafka.coordinator.group.assignor.SubscriptionType; import org.apache.kafka.coordinator.group.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.assignor.MemberAssignment; @@ -207,8 +208,11 @@ public TargetAssignmentBuilder.TargetAssignmentResult build() { SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadataMap); SubscriptionType subscriptionType = HOMOGENEOUS; + // Prepare the member assignments per topic partition. + Map> invertedTargetAssignment = AssignmentTestUtil.invertedTargetAssignment(memberSpecs); + // Prepare the expected assignment spec. - AssignmentSpec assignmentSpec = new AssignmentSpec(memberSpecs, subscriptionType); + GroupSpecImpl groupSpec = new GroupSpecImpl(memberSpecs, subscriptionType, invertedTargetAssignment); // We use `any` here to always return an assignment but use `verify` later on // to ensure that the input was correct. @@ -222,6 +226,7 @@ public TargetAssignmentBuilder.TargetAssignmentResult build() { .withSubscriptionMetadata(subscriptionMetadata) .withSubscriptionType(subscriptionType) .withTargetAssignment(targetAssignment) + .withInvertedTargetAssignment(invertedTargetAssignment) .withTopicsImage(topicsImage); // Add the updated members or delete the deleted members. @@ -239,7 +244,7 @@ public TargetAssignmentBuilder.TargetAssignmentResult build() { // Verify that the assignor was called once with the expected // assignment spec. verify(assignor, times(1)) - .assign(assignmentSpec, subscribedTopicMetadata); + .assign(groupSpec, subscribedTopicMetadata); return result; } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java new file mode 100644 index 0000000000000..2d2edf222f9dd --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.image.MetadataDelta; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class AssignorBenchmarkUtils { + /** + * Generate a reverse look up map of partition to member target assignments from the given member spec. + * + * @param groupAssignment The group assignment. + * @return Map of topic partition to member assignments. + */ + public static Map> computeInvertedTargetAssignment( + GroupAssignment groupAssignment + ) { + Map> invertedTargetAssignment = new HashMap<>(); + for (Map.Entry memberEntry : groupAssignment.members().entrySet()) { + String memberId = memberEntry.getKey(); + Map> topicsAndPartitions = memberEntry.getValue().targetPartitions(); + + for (Map.Entry> topicEntry : topicsAndPartitions.entrySet()) { + Uuid topicId = topicEntry.getKey(); + Set partitions = topicEntry.getValue(); + + Map partitionMap = invertedTargetAssignment.computeIfAbsent(topicId, k -> new HashMap<>()); + + for (Integer partitionId : partitions) { + partitionMap.put(partitionId, memberId); + } + } + } + return invertedTargetAssignment; + } + + public static void addTopic( + MetadataDelta delta, + Uuid topicId, + String topicName, + int numPartitions + ) { + // For testing purposes, the following criteria are used: + // - Number of replicas for each partition: 2 + // - Number of brokers available in the cluster: 4 + delta.replay(new TopicRecord().setTopicId(topicId).setName(topicName)); + for (int i = 0; i < numPartitions; i++) { + delta.replay(new PartitionRecord() + .setTopicId(topicId) + .setPartitionId(i) + .setReplicas(Arrays.asList(i % 4, (i + 1) % 4))); + } + } +} diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java index eb9c4ee6e2702..2349350e4ba09 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java @@ -18,7 +18,7 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; -import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl; import org.apache.kafka.coordinator.group.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.assignor.MemberAssignment; import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; @@ -29,10 +29,12 @@ import org.apache.kafka.coordinator.group.assignor.UniformAssignor; import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; import org.apache.kafka.coordinator.group.consumer.TopicMetadata; + import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.MetadataProvenance; import org.apache.kafka.image.TopicsImage; + import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -119,7 +121,7 @@ public enum AssignmentType { private static final int MAX_BUCKET_COUNT = 5; - private AssignmentSpec assignmentSpec; + private GroupSpecImpl groupSpec; private SubscribedTopicDescriber subscribedTopicDescriber; @@ -160,7 +162,13 @@ private Map createTopicMetadata() { partitionsPerTopicCount, partitionRacks )); - TargetAssignmentBuilderBenchmark.addTopic(delta, topicUuid, topicName, partitionsPerTopicCount); + + AssignorBenchmarkUtils.addTopic( + delta, + topicUuid, + topicName, + partitionsPerTopicCount + ); } topicsImage = delta.apply(MetadataProvenance.EMPTY).topics(); @@ -207,7 +215,7 @@ private void createAssignmentSpec() { } } - this.assignmentSpec = new AssignmentSpec(members, subscriptionType); + this.groupSpec = new GroupSpecImpl(members, subscriptionType, Collections.emptyMap()); } private Optional rackId(int memberIndex) { @@ -243,16 +251,23 @@ private static Map> mkMapOfPartitionRacks(int numPartitions } private void simulateIncrementalRebalance() { - GroupAssignment initialAssignment = partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber); + GroupAssignment initialAssignment = partitionAssignor.assign(groupSpec, subscribedTopicDescriber); Map members = initialAssignment.members(); + Map> invertedTargetAssignment = AssignorBenchmarkUtils.computeInvertedTargetAssignment(initialAssignment); + Map updatedMembers = new HashMap<>(); - members.forEach((memberId, memberAssignment) -> { - AssignmentMemberSpec memberSpec = assignmentSpec.members().get(memberId); + + groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { + MemberAssignment memberAssignment = members.getOrDefault( + memberId, + new MemberAssignment(Collections.emptyMap()) + ); + updatedMembers.put(memberId, new AssignmentMemberSpec( - memberSpec.instanceId(), - memberSpec.rackId(), - memberSpec.subscribedTopicIds(), + assignmentMemberSpec.instanceId(), + assignmentMemberSpec.rackId(), + assignmentMemberSpec.subscribedTopicIds(), memberAssignment.targetPartitions() )); }); @@ -272,13 +287,13 @@ private void simulateIncrementalRebalance() { Collections.emptyMap() )); - assignmentSpec = new AssignmentSpec(updatedMembers, subscriptionType); + groupSpec = new GroupSpecImpl(updatedMembers, subscriptionType, invertedTargetAssignment); } @Benchmark @Threads(1) @OutputTimeUnit(TimeUnit.MILLISECONDS) public void doAssignment() { - partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber); + partitionAssignor.assign(groupSpec, subscribedTopicDescriber); } } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java index 3244f7ea82546..511db01c86219 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java @@ -17,10 +17,8 @@ package org.apache.kafka.jmh.assignor; import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.metadata.PartitionRecord; -import org.apache.kafka.common.metadata.TopicRecord; import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; -import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupSpecImpl; import org.apache.kafka.coordinator.group.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.assignor.MemberAssignment; import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; @@ -36,6 +34,7 @@ import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.MetadataProvenance; import org.apache.kafka.image.TopicsImage; + import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -51,7 +50,6 @@ import org.openjdk.jmh.annotations.Warmup; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -90,7 +88,9 @@ public class TargetAssignmentBuilderBenchmark { private TargetAssignmentBuilder targetAssignmentBuilder; - private AssignmentSpec assignmentSpec; + private GroupSpecImpl groupSpec; + + private Map> invertedTargetAssignment; private final List allTopicNames = new ArrayList<>(); @@ -104,7 +104,7 @@ public void setup() { subscriptionMetadata = generateMockSubscriptionMetadata(); Map members = generateMockMembers(); - Map existingTargetAssignment = generateMockInitialTargetAssignment(); + Map existingTargetAssignment = generateMockInitialTargetAssignmentAndUpdateInvertedTargetAssignment(); ConsumerGroupMember newMember = new ConsumerGroupMember.Builder("newMember") .setSubscribedTopicNames(allTopicNames) @@ -113,8 +113,9 @@ public void setup() { targetAssignmentBuilder = new TargetAssignmentBuilder(GROUP_ID, GROUP_EPOCH, partitionAssignor) .withMembers(members) .withSubscriptionMetadata(subscriptionMetadata) - .withTargetAssignment(existingTargetAssignment) .withSubscriptionType(HOMOGENEOUS) + .withTargetAssignment(existingTargetAssignment) + .withInvertedTargetAssignment(invertedTargetAssignment) .withTopicsImage(topicsImage) .addOrUpdateMember(newMember.memberId(), newMember); } @@ -148,14 +149,20 @@ private Map generateMockSubscriptionMetadata() { Collections.emptyMap() ); subscriptionMetadata.put(topicName, metadata); - addTopic(delta, topicId, topicName, partitionsPerTopicCount); + + AssignorBenchmarkUtils.addTopic( + delta, + topicId, + topicName, + partitionsPerTopicCount + ); } topicsImage = delta.apply(MetadataProvenance.EMPTY).topics(); return subscriptionMetadata; } - private Map generateMockInitialTargetAssignment() { + private Map generateMockInitialTargetAssignmentAndUpdateInvertedTargetAssignment() { Map topicMetadataMap = new HashMap<>(topicCount); subscriptionMetadata.forEach((topicName, topicMetadata) -> topicMetadataMap.put( @@ -167,9 +174,10 @@ private Map generateMockInitialTargetAssignment() { createAssignmentSpec(); GroupAssignment groupAssignment = partitionAssignor.assign( - assignmentSpec, + groupSpec, new SubscribedTopicMetadata(topicMetadataMap) ); + invertedTargetAssignment = AssignorBenchmarkUtils.computeInvertedTargetAssignment(groupAssignment); Map initialTargetAssignment = new HashMap<>(memberCount); @@ -198,25 +206,7 @@ private void createAssignmentSpec() { Collections.emptyMap() )); } - assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); - } - - public static void addTopic( - MetadataDelta delta, - Uuid topicId, - String topicName, - int numPartitions - ) { - // For testing purposes, the following criteria are used: - // - Number of replicas for each partition: 2 - // - Number of brokers available in the cluster: 4 - delta.replay(new TopicRecord().setTopicId(topicId).setName(topicName)); - for (int i = 0; i < numPartitions; i++) { - delta.replay(new PartitionRecord() - .setTopicId(topicId) - .setPartitionId(i) - .setReplicas(Arrays.asList(i % 4, (i + 1) % 4))); - } + groupSpec = new GroupSpecImpl(members, HOMOGENEOUS, Collections.emptyMap()); } @Benchmark