Skip to content

Commit

Permalink
KAFKA-16625; Reverse lookup map from topic partitions to members (apa…
Browse files Browse the repository at this point in the history
…che#15974)

This patch speeds up the computation of the unassigned partitions by exposing the inverted target assignment. It allows the assignor to check whether a partition is assigned or not.

Reviewers: Jeff Kim <[email protected]>, David Jacot <[email protected]>
  • Loading branch information
rreddy-22 authored and chiacyu committed Jun 1, 2024
1 parent cc3de23 commit 7e805bc
Show file tree
Hide file tree
Showing 23 changed files with 890 additions and 239 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1901,6 +1901,7 @@ private Assignment updateTargetAssignment(
.withSubscriptionMetadata(subscriptionMetadata)
.withSubscriptionType(subscriptionType)
.withTargetAssignment(group.targetAssignment())
.withInvertedTargetAssignment(group.invertedTargetAssignment())
.withTopicsImage(metadataImage.topics())
.addOrUpdateMember(updatedMember.memberId(), updatedMember);
TargetAssignmentBuilder.TargetAssignmentResult assignmentResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu
*/
private final PartitionMovements partitionMovements;

public GeneralUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
this.members = assignmentSpec.members();
public GeneralUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
this.members = groupSpec.members();
this.subscribedTopicDescriber = subscribedTopicDescriber;
this.subscribedTopicIds = new HashSet<>();
this.membersPerTopic = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;

import org.apache.kafka.common.Uuid;

import java.util.Map;

/**
* The group metadata specifications required to compute the target assignment.
*/
public interface GroupSpec {
/**
* @return Member metadata keyed by member Id.
*/
Map<String, AssignmentMemberSpec> members();

/**
* @return The group's subscription type.
*/
SubscriptionType subscriptionType();

/**
* @return True, if the partition is currently assigned to a member.
* False, otherwise.
*/
boolean isPartitionAssigned(Uuid topicId, int partitionId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
*/
package org.apache.kafka.coordinator.group.assignor;

import org.apache.kafka.common.Uuid;

import java.util.Map;
import java.util.Objects;

/**
* The assignment specification for a consumer group.
*/
public class AssignmentSpec {
public class GroupSpecImpl implements GroupSpec {
/**
* The member metadata keyed by member Id.
*/
Expand All @@ -33,44 +35,76 @@ public class AssignmentSpec {
*/
private final SubscriptionType subscriptionType;

public AssignmentSpec(
/**
* Reverse lookup map representing topic partitions with
* their current member assignments.
*/
private final Map<Uuid, Map<Integer, String>> invertedTargetAssignment;

public GroupSpecImpl(
Map<String, AssignmentMemberSpec> members,
SubscriptionType subscriptionType
SubscriptionType subscriptionType,
Map<Uuid, Map<Integer, String>> invertedTargetAssignment
) {
Objects.requireNonNull(members);
Objects.requireNonNull(subscriptionType);
Objects.requireNonNull(invertedTargetAssignment);
this.members = members;
this.subscriptionType = subscriptionType;
this.invertedTargetAssignment = invertedTargetAssignment;
}

/**
* @return Member metadata keyed by member Id.
* {@inheritDoc}
*/
@Override
public Map<String, AssignmentMemberSpec> members() {
return members;
}

/**
* @return The group's subscription type.
* {@inheritDoc}
*/
@Override
public SubscriptionType subscriptionType() {
return subscriptionType;
}

/**
* {@inheritDoc}
*/
@Override
public boolean isPartitionAssigned(Uuid topicId, int partitionId) {
Map<Integer, String> partitionMap = invertedTargetAssignment.get(topicId);
if (partitionMap == null) {
return false;
}
return partitionMap.containsKey(partitionId);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AssignmentSpec that = (AssignmentSpec) o;
GroupSpecImpl that = (GroupSpecImpl) o;
return subscriptionType == that.subscriptionType &&
members.equals(that.members);
members.equals(that.members) &&
invertedTargetAssignment.equals(that.invertedTargetAssignment);
}

@Override
public int hashCode() {
return Objects.hash(members, subscriptionType);
int result = members.hashCode();
result = 31 * result + subscriptionType.hashCode();
result = 31 * result + invertedTargetAssignment.hashCode();
return result;
}

@Override
public String toString() {
return "AssignmentSpec(members=" + members + ", subscriptionType=" + subscriptionType.toString() + ')';
return "GroupSpecImpl(members=" + members +
", subscriptionType=" + subscriptionType +
", invertedTargetAssignment=" + invertedTargetAssignment +
')';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignment
/**
* The assignment specification which includes member metadata.
*/
private final AssignmentSpec assignmentSpec;
private final GroupSpec groupSpec;

/**
* The topic and partition metadata describer.
Expand Down Expand Up @@ -89,28 +89,29 @@ public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignment
* The partitions that still need to be assigned.
* Initially this contains all the subscribed topics' partitions.
*/
private Set<TopicIdPartition> unassignedPartitions;
private final Set<TopicIdPartition> unassignedPartitions;

/**
* The target assignment.
*/
private final Map<String, MemberAssignment> targetAssignment;

OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
this.assignmentSpec = assignmentSpec;
OptimizedUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
this.groupSpec = groupSpec;
this.subscribedTopicDescriber = subscribedTopicDescriber;
this.subscribedTopicIds = new HashSet<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds());
this.subscribedTopicIds = new HashSet<>(groupSpec.members().values().iterator().next().subscribedTopicIds());
this.potentiallyUnfilledMembers = new HashMap<>();
this.unassignedPartitions = new HashSet<>();
this.targetAssignment = new HashMap<>();
}

/**
* Here's the step-by-step breakdown of the assignment process:
*
* <li> Compute the quotas of partitions for each member based on the total partitions and member count.</li>
* <li> Initialize unassigned partitions to all the topic partitions and
* remove partitions from the list as and when they are assigned.</li>
* <li> For existing assignments, retain partitions based on the determined quota.</li>
* <li> Initialize unassigned partitions with all the topic partitions that aren't present in the
* current target assignment.</li>
* <li> For existing assignments, retain partitions based on the determined quota. Add extras to unassigned partitions.</li>
* <li> Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions.</li>
* <li> Proceed with a round-robin assignment according to quotas.
* For each unassigned partition, locate the first compatible member from the potentially unfilled list.</li>
Expand All @@ -124,28 +125,35 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException {
return new GroupAssignment(Collections.emptyMap());
}

// Check if the subscribed topicId is still valid.
// Update unassigned partitions based on the current target assignment
// and topic metadata.
for (Uuid topicId : subscribedTopicIds) {
int partitionCount = subscribedTopicDescriber.numPartitions(topicId);
if (partitionCount == -1) {
throw new PartitionAssignorException(
"Members are subscribed to topic " + topicId + " which doesn't exist in the topic metadata."
);
} else {
for (int i = 0; i < partitionCount; i++) {
if (!groupSpec.isPartitionAssigned(topicId, i)) {
unassignedPartitions.add(new TopicIdPartition(topicId, i));
}
}
totalPartitionsCount += partitionCount;
}
}

// The minimum required quota that each member needs to meet for a balanced assignment.
// This is the same for all members.
final int numberOfMembers = assignmentSpec.members().size();
final int numberOfMembers = groupSpec.members().size();
final int minQuota = totalPartitionsCount / numberOfMembers;
remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers;

assignmentSpec.members().keySet().forEach(memberId ->
groupSpec.members().keySet().forEach(memberId ->
targetAssignment.put(memberId, new MemberAssignment(new HashMap<>())
));

unassignedPartitions = topicIdPartitions(subscribedTopicIds, subscribedTopicDescriber);
potentiallyUnfilledMembers = assignStickyPartitions(minQuota);

unassignedPartitionsRoundRobinAssignment();
Expand Down Expand Up @@ -179,7 +187,7 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException {
private Map<String, Integer> assignStickyPartitions(int minQuota) {
Map<String, Integer> potentiallyUnfilledMembers = new HashMap<>();

assignmentSpec.members().forEach((memberId, assignmentMemberSpec) -> {
groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
List<TopicIdPartition> validCurrentMemberAssignment = validCurrentMemberAssignment(
assignmentMemberSpec.assignedPartitions()
);
Expand All @@ -198,20 +206,28 @@ private Map<String, Integer> assignStickyPartitions(int minQuota) {
topicIdPartition.topicId(),
topicIdPartition.partitionId()
);
unassignedPartitions.remove(topicIdPartition);
});

// The extra partition is located at the last index from the previous step.
if (remaining < 0 && remainingMembersToGetAnExtraPartition > 0) {
TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount);
addPartitionToAssignment(
targetAssignment,
memberId,
topicIdPartition.topicId(),
topicIdPartition.partitionId()
);
unassignedPartitions.remove(topicIdPartition);
remainingMembersToGetAnExtraPartition--;
if (remaining < 0) {
// The extra partition is located at the last index from the previous step.
if (remainingMembersToGetAnExtraPartition > 0) {
TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++);
addPartitionToAssignment(
targetAssignment,
memberId,
topicIdPartition.topicId(),
topicIdPartition.partitionId()
);
remainingMembersToGetAnExtraPartition--;
}
// Any previously owned partitions that weren't retained due to the quotas
// are added to the unassigned partitions set.
if (retainedPartitionsCount < currentAssignmentSize) {
unassignedPartitions.addAll(validCurrentMemberAssignment.subList(
retainedPartitionsCount,
currentAssignmentSize
));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ public interface PartitionAssignor {
/**
* Assigns partitions to group members based on the given assignment specification and topic metadata.
*
* @param assignmentSpec The assignment spec which includes member metadata.
* @param groupSpec The assignment spec which includes member metadata.
* @param subscribedTopicDescriber The topic and partition metadata describer.
* @return The new assignment for the group.
*/
GroupAssignment assign(
AssignmentSpec assignmentSpec,
GroupSpec groupSpec,
SubscribedTopicDescriber subscribedTopicDescriber
) throws PartitionAssignorException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.apache.kafka.common.errors.ApiException;

/**
* Exception thrown by {@link PartitionAssignor#assign(AssignmentSpec)}. The exception
* Exception thrown by {@link PartitionAssignor#assign(GroupSpec, SubscribedTopicDescriber)}}. The exception
* is only used internally.
*/
public class PartitionAssignorException extends ApiException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,20 @@ public MemberWithRemainingAssignments(String memberId, int remaining) {
* Returns a map of topic Ids to a list of members subscribed to them,
* based on the given assignment specification and metadata.
*
* @param assignmentSpec The specification for member assignments.
* @param groupSpec The specification for member assignments.
* @param subscribedTopicDescriber The metadata describer for subscribed topics and clusters.
* @return A map of topic Ids to a list of member Ids subscribed to them.
*
* @throws PartitionAssignorException If a member is subscribed to a non-existent topic.
*/
private Map<Uuid, Collection<String>> membersPerTopic(
final AssignmentSpec assignmentSpec,
final GroupSpec groupSpec,
final SubscribedTopicDescriber subscribedTopicDescriber
) {
Map<Uuid, Collection<String>> membersPerTopic = new HashMap<>();
Map<String, AssignmentMemberSpec> membersData = assignmentSpec.members();
Map<String, AssignmentMemberSpec> membersData = groupSpec.members();

if (assignmentSpec.subscriptionType().equals(HOMOGENEOUS)) {
if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
Set<String> allMembers = membersData.keySet();
Collection<Uuid> topics = membersData.values().iterator().next().subscribedTopicIds();

Expand Down Expand Up @@ -139,15 +139,15 @@ private Map<Uuid, Collection<String>> membersPerTopic(
*/
@Override
public GroupAssignment assign(
final AssignmentSpec assignmentSpec,
final GroupSpec groupSpec,
final SubscribedTopicDescriber subscribedTopicDescriber
) throws PartitionAssignorException {

Map<String, MemberAssignment> newTargetAssignment = new HashMap<>();

// Step 1
Map<Uuid, Collection<String>> membersPerTopic = membersPerTopic(
assignmentSpec,
groupSpec,
subscribedTopicDescriber
);

Expand All @@ -162,7 +162,7 @@ public GroupAssignment assign(
List<MemberWithRemainingAssignments> potentiallyUnfilledMembers = new ArrayList<>();

for (String memberId : membersForTopic) {
Set<Integer> assignedPartitionsForTopic = assignmentSpec.members().get(memberId)
Set<Integer> assignedPartitionsForTopic = groupSpec.members().get(memberId)
.assignedPartitions().getOrDefault(topicId, Collections.emptySet());

int currentAssignmentSize = assignedPartitionsForTopic.size();
Expand Down
Loading

0 comments on commit 7e805bc

Please sign in to comment.