Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16625: Reverse lookup map from topic partitions to members #15974

Merged
merged 60 commits into from
May 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
7d92a43
Add isSubscriptionHomogeneous value in ConsumerGroup
rreddy-22 Apr 19, 2024
f4ec728
Add isSubscriptionHomogeneous flag to TargetAssignmentBuilder
rreddy-22 Apr 22, 2024
0cb1a5f
Add isSubscriptionHomogeneous flag to AssignmentSpec
rreddy-22 Apr 22, 2024
1b7b693
Edit existing tests to incorporate changes
rreddy-22 Apr 22, 2024
e7c68b5
add getter method in AssignmentSpec
rreddy-22 Apr 22, 2024
11b7b1f
Modify UniformAssignor to use flag to decide which assignment builder…
rreddy-22 Apr 22, 2024
e710fca
Add optimization to the Range Assignor using flag
rreddy-22 Apr 22, 2024
b74f713
Change flag to TimelineObject in Consumer Group
rreddy-22 Apr 22, 2024
0785f3b
Incorporate updating subs model into update subs topic names method
rreddy-22 Apr 23, 2024
b7c2a24
minor
rreddy-22 Apr 23, 2024
db1c601
Revert to brute force check for subsModelUpdate
rreddy-22 Apr 24, 2024
2afb1bc
Convert group subscription model to enum from isSubscriptionHomogenen…
rreddy-22 Apr 24, 2024
50812d5
Enum added in a separate file
rreddy-22 Apr 24, 2024
9dada36
Add unit test
rreddy-22 Apr 24, 2024
d63e491
minor
rreddy-22 Apr 25, 2024
e3a8b95
Rename subsModel to subsType, PR comments
rreddy-22 Apr 25, 2024
9fa0ab3
Use enum in benchmarks
rreddy-22 Apr 25, 2024
5c4c4ed
move out subs updates into the group metadata manager
rreddy-22 Apr 26, 2024
0d25f3a
minor
rreddy-22 Apr 26, 2024
a4ce616
minor
rreddy-22 Apr 26, 2024
9cdd729
Address comments
rreddy-22 Apr 29, 2024
9ca54c3
minor
rreddy-22 Apr 30, 2024
a36d59c
address comments
rreddy-22 May 2, 2024
9d4fbfe
Initialize target assignment map in range assignor
rreddy-22 May 3, 2024
01a7b6e
Update numOfMembers in heartbeat
rreddy-22 May 6, 2024
d0a0ef9
Revert initialising new map in range assignor
rreddy-22 May 8, 2024
7e6a7eb
Merge remote-tracking branch 'upstream/trunk' into KAFKA-16587_Add_Su…
rreddy-22 May 8, 2024
ca53724
MINOR: Fix streams javadoc links (#15900)
AyoubOm May 9, 2024
fa9f388
MINOR: use classic consumer with ZK mode for DeleteOffsetsConsumerGro…
FrankYang0529 May 9, 2024
fa22563
MINOR: Add missing RPCs to security.html (#15878)
AndrewJSchofield May 9, 2024
65b82ef
Add new group assignment interface and method to get if partition is …
rreddy-22 May 9, 2024
d2e7ef4
Integrate using the lookup map in the opt uniform assignor
rreddy-22 May 10, 2024
5a2d2a6
Integrate using the lookup map in the consumer group state
rreddy-22 May 13, 2024
729c1f3
Integrate using the lookup map in the consumer group state minor
rreddy-22 May 13, 2024
213dfe5
Update some test files and server benchmark and minor hacks - draft
rreddy-22 May 13, 2024
505b859
Use Map<Uuid,Map<Integer,String> to track partition assignments
rreddy-22 May 15, 2024
91d20b0
Update tests and benchmarks
rreddy-22 May 15, 2024
346e618
Minor + all unit tests pass for optimized uniform
rreddy-22 May 15, 2024
c8e0c34
Merge remote-tracking branch 'upstream/trunk' into KAFKA-16625_Revers…
rreddy-22 May 15, 2024
66fcd89
revert mistake
rreddy-22 May 15, 2024
867103e
minor
rreddy-22 May 15, 2024
55c76ec
Update assignor tests with partition assignments - all tests pass
rreddy-22 May 16, 2024
bc47eff
Rename assignmentSpec and groupAssignmentSpec
rreddy-22 May 16, 2024
5de869c
more renaming, minor
rreddy-22 May 16, 2024
3ac9c27
minor
rreddy-22 May 16, 2024
57a8db9
revert param changes in benchmarks
rreddy-22 May 16, 2024
86bf48b
minor
rreddy-22 May 16, 2024
4055e09
Address comments
rreddy-22 May 18, 2024
f7d0af7
Rename partitionAssignments to invertedTargetAssignments
rreddy-22 May 21, 2024
555b519
InheritDocs, comments
rreddy-22 May 21, 2024
449b168
minor
rreddy-22 May 22, 2024
2eadcb3
comments
rreddy-22 May 22, 2024
65a2866
comments
rreddy-22 May 22, 2024
a99fadc
comments
rreddy-22 May 24, 2024
3a27648
edit import problems
rreddy-22 May 24, 2024
09c11e5
Merge remote-tracking branch 'upstream/trunk' into KAFKA-16625_Revers…
rreddy-22 May 24, 2024
756983a
move add topic to assignor utils
rreddy-22 May 24, 2024
e00c5ed
minor
rreddy-22 May 24, 2024
5bf574e
minor
rreddy-22 May 24, 2024
dccb583
minor
rreddy-22 May 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1901,6 +1901,7 @@ private Assignment updateTargetAssignment(
.withSubscriptionMetadata(subscriptionMetadata)
.withSubscriptionType(subscriptionType)
.withTargetAssignment(group.targetAssignment())
.withInvertedTargetAssignment(group.invertedTargetAssignment())
.withTopicsImage(metadataImage.topics())
.addOrUpdateMember(updatedMember.memberId(), updatedMember);
TargetAssignmentBuilder.TargetAssignmentResult assignmentResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu
*/
private final PartitionMovements partitionMovements;

public GeneralUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
this.members = assignmentSpec.members();
public GeneralUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
this.members = groupSpec.members();
this.subscribedTopicDescriber = subscribedTopicDescriber;
this.subscribedTopicIds = new HashSet<>();
this.membersPerTopic = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;

import org.apache.kafka.common.Uuid;

import java.util.Map;

/**
* The group metadata specifications required to compute the target assignment.
*/
public interface GroupSpec {
/**
* @return Member metadata keyed by member Id.
*/
Map<String, AssignmentMemberSpec> members();

/**
* @return The group's subscription type.
*/
SubscriptionType subscriptionType();

/**
rreddy-22 marked this conversation as resolved.
Show resolved Hide resolved
* @return True, if the partition is currently assigned to a member.
* False, otherwise.
*/
boolean isPartitionAssigned(Uuid topicId, int partitionId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
*/
package org.apache.kafka.coordinator.group.assignor;

import org.apache.kafka.common.Uuid;

import java.util.Map;
import java.util.Objects;

/**
* The assignment specification for a consumer group.
*/
public class AssignmentSpec {
public class GroupSpecImpl implements GroupSpec {
/**
* The member metadata keyed by member Id.
*/
Expand All @@ -33,44 +35,76 @@ public class AssignmentSpec {
*/
private final SubscriptionType subscriptionType;

public AssignmentSpec(
/**
* Reverse lookup map representing topic partitions with
* their current member assignments.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think i'm slightly confused again on the types of assignments.

Target Assignment: the assignment we will reconcile to.

Current Assignment: the previous group epoch's target assignment.

is this correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

*/
private final Map<Uuid, Map<Integer, String>> invertedTargetAssignment;

public GroupSpecImpl(
Map<String, AssignmentMemberSpec> members,
SubscriptionType subscriptionType
SubscriptionType subscriptionType,
Map<Uuid, Map<Integer, String>> invertedTargetAssignment
) {
Objects.requireNonNull(members);
Objects.requireNonNull(subscriptionType);
Objects.requireNonNull(invertedTargetAssignment);
this.members = members;
this.subscriptionType = subscriptionType;
this.invertedTargetAssignment = invertedTargetAssignment;
}

/**
* @return Member metadata keyed by member Id.
* {@inheritDoc}
*/
@Override
public Map<String, AssignmentMemberSpec> members() {
rreddy-22 marked this conversation as resolved.
Show resolved Hide resolved
return members;
}

/**
* @return The group's subscription type.
* {@inheritDoc}
*/
@Override
public SubscriptionType subscriptionType() {
rreddy-22 marked this conversation as resolved.
Show resolved Hide resolved
return subscriptionType;
}

/**
rreddy-22 marked this conversation as resolved.
Show resolved Hide resolved
* {@inheritDoc}
*/
@Override
public boolean isPartitionAssigned(Uuid topicId, int partitionId) {
rreddy-22 marked this conversation as resolved.
Show resolved Hide resolved
Map<Integer, String> partitionMap = invertedTargetAssignment.get(topicId);
if (partitionMap == null) {
return false;
}
return partitionMap.containsKey(partitionId);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AssignmentSpec that = (AssignmentSpec) o;
GroupSpecImpl that = (GroupSpecImpl) o;
return subscriptionType == that.subscriptionType &&
members.equals(that.members);
members.equals(that.members) &&
invertedTargetAssignment.equals(that.invertedTargetAssignment);
}

@Override
public int hashCode() {
return Objects.hash(members, subscriptionType);
int result = members.hashCode();
result = 31 * result + subscriptionType.hashCode();
result = 31 * result + invertedTargetAssignment.hashCode();
return result;
}

@Override
public String toString() {
return "AssignmentSpec(members=" + members + ", subscriptionType=" + subscriptionType.toString() + ')';
return "GroupSpecImpl(members=" + members +
", subscriptionType=" + subscriptionType +
", invertedTargetAssignment=" + invertedTargetAssignment +
')';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignment
/**
* The assignment specification which includes member metadata.
*/
private final AssignmentSpec assignmentSpec;
private final GroupSpec groupSpec;

/**
* The topic and partition metadata describer.
Expand Down Expand Up @@ -89,28 +89,29 @@ public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignment
* The partitions that still need to be assigned.
* Initially this contains all the subscribed topics' partitions.
*/
private Set<TopicIdPartition> unassignedPartitions;
private final Set<TopicIdPartition> unassignedPartitions;

/**
* The target assignment.
*/
private final Map<String, MemberAssignment> targetAssignment;

OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
this.assignmentSpec = assignmentSpec;
OptimizedUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
this.groupSpec = groupSpec;
this.subscribedTopicDescriber = subscribedTopicDescriber;
this.subscribedTopicIds = new HashSet<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds());
this.subscribedTopicIds = new HashSet<>(groupSpec.members().values().iterator().next().subscribedTopicIds());
this.potentiallyUnfilledMembers = new HashMap<>();
this.unassignedPartitions = new HashSet<>();
this.targetAssignment = new HashMap<>();
}

/**
* Here's the step-by-step breakdown of the assignment process:
*
* <li> Compute the quotas of partitions for each member based on the total partitions and member count.</li>
* <li> Initialize unassigned partitions to all the topic partitions and
* remove partitions from the list as and when they are assigned.</li>
* <li> For existing assignments, retain partitions based on the determined quota.</li>
* <li> Initialize unassigned partitions with all the topic partitions that aren't present in the
* current target assignment.</li>
* <li> For existing assignments, retain partitions based on the determined quota. Add extras to unassigned partitions.</li>
* <li> Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions.</li>
* <li> Proceed with a round-robin assignment according to quotas.
* For each unassigned partition, locate the first compatible member from the potentially unfilled list.</li>
Expand All @@ -124,28 +125,35 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException {
return new GroupAssignment(Collections.emptyMap());
}

// Check if the subscribed topicId is still valid.
// Update unassigned partitions based on the current target assignment
// and topic metadata.
for (Uuid topicId : subscribedTopicIds) {
int partitionCount = subscribedTopicDescriber.numPartitions(topicId);
if (partitionCount == -1) {
throw new PartitionAssignorException(
"Members are subscribed to topic " + topicId + " which doesn't exist in the topic metadata."
);
} else {
for (int i = 0; i < partitionCount; i++) {
if (!groupSpec.isPartitionAssigned(topicId, i)) {
unassignedPartitions.add(new TopicIdPartition(topicId, i));
}
}
totalPartitionsCount += partitionCount;
}
}

// The minimum required quota that each member needs to meet for a balanced assignment.
// This is the same for all members.
final int numberOfMembers = assignmentSpec.members().size();
final int numberOfMembers = groupSpec.members().size();
final int minQuota = totalPartitionsCount / numberOfMembers;
remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers;

assignmentSpec.members().keySet().forEach(memberId ->
groupSpec.members().keySet().forEach(memberId ->
targetAssignment.put(memberId, new MemberAssignment(new HashMap<>())
));

unassignedPartitions = topicIdPartitions(subscribedTopicIds, subscribedTopicDescriber);
potentiallyUnfilledMembers = assignStickyPartitions(minQuota);

unassignedPartitionsRoundRobinAssignment();
Expand Down Expand Up @@ -179,7 +187,7 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException {
private Map<String, Integer> assignStickyPartitions(int minQuota) {
Map<String, Integer> potentiallyUnfilledMembers = new HashMap<>();

assignmentSpec.members().forEach((memberId, assignmentMemberSpec) -> {
groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
List<TopicIdPartition> validCurrentMemberAssignment = validCurrentMemberAssignment(
assignmentMemberSpec.assignedPartitions()
);
Expand All @@ -198,20 +206,28 @@ private Map<String, Integer> assignStickyPartitions(int minQuota) {
topicIdPartition.topicId(),
topicIdPartition.partitionId()
);
unassignedPartitions.remove(topicIdPartition);
});

// The extra partition is located at the last index from the previous step.
if (remaining < 0 && remainingMembersToGetAnExtraPartition > 0) {
TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount);
addPartitionToAssignment(
targetAssignment,
memberId,
topicIdPartition.topicId(),
topicIdPartition.partitionId()
);
unassignedPartitions.remove(topicIdPartition);
remainingMembersToGetAnExtraPartition--;
if (remaining < 0) {
// The extra partition is located at the last index from the previous step.
if (remainingMembersToGetAnExtraPartition > 0) {
TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++);
addPartitionToAssignment(
targetAssignment,
memberId,
topicIdPartition.topicId(),
topicIdPartition.partitionId()
);
remainingMembersToGetAnExtraPartition--;
}
// Any previously owned partitions that weren't retained due to the quotas
// are added to the unassigned partitions set.
if (retainedPartitionsCount < currentAssignmentSize) {
rreddy-22 marked this conversation as resolved.
Show resolved Hide resolved
unassignedPartitions.addAll(validCurrentMemberAssignment.subList(
retainedPartitionsCount,
currentAssignmentSize
));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ public interface PartitionAssignor {
/**
* Assigns partitions to group members based on the given assignment specification and topic metadata.
*
* @param assignmentSpec The assignment spec which includes member metadata.
* @param groupSpec The assignment spec which includes member metadata.
* @param subscribedTopicDescriber The topic and partition metadata describer.
* @return The new assignment for the group.
*/
GroupAssignment assign(
AssignmentSpec assignmentSpec,
GroupSpec groupSpec,
SubscribedTopicDescriber subscribedTopicDescriber
) throws PartitionAssignorException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.apache.kafka.common.errors.ApiException;

/**
* Exception thrown by {@link PartitionAssignor#assign(AssignmentSpec)}. The exception
* Exception thrown by {@link PartitionAssignor#assign(GroupSpec, SubscribedTopicDescriber)}}. The exception
* is only used internally.
*/
public class PartitionAssignorException extends ApiException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,20 @@ public MemberWithRemainingAssignments(String memberId, int remaining) {
* Returns a map of topic Ids to a list of members subscribed to them,
* based on the given assignment specification and metadata.
*
* @param assignmentSpec The specification for member assignments.
* @param groupSpec The specification for member assignments.
* @param subscribedTopicDescriber The metadata describer for subscribed topics and clusters.
* @return A map of topic Ids to a list of member Ids subscribed to them.
*
* @throws PartitionAssignorException If a member is subscribed to a non-existent topic.
*/
private Map<Uuid, Collection<String>> membersPerTopic(
final AssignmentSpec assignmentSpec,
final GroupSpec groupSpec,
final SubscribedTopicDescriber subscribedTopicDescriber
) {
Map<Uuid, Collection<String>> membersPerTopic = new HashMap<>();
Map<String, AssignmentMemberSpec> membersData = assignmentSpec.members();
Map<String, AssignmentMemberSpec> membersData = groupSpec.members();

if (assignmentSpec.subscriptionType().equals(HOMOGENEOUS)) {
if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
Set<String> allMembers = membersData.keySet();
Collection<Uuid> topics = membersData.values().iterator().next().subscribedTopicIds();

Expand Down Expand Up @@ -139,15 +139,15 @@ private Map<Uuid, Collection<String>> membersPerTopic(
*/
@Override
public GroupAssignment assign(
final AssignmentSpec assignmentSpec,
final GroupSpec groupSpec,
final SubscribedTopicDescriber subscribedTopicDescriber
) throws PartitionAssignorException {

Map<String, MemberAssignment> newTargetAssignment = new HashMap<>();

// Step 1
Map<Uuid, Collection<String>> membersPerTopic = membersPerTopic(
assignmentSpec,
groupSpec,
subscribedTopicDescriber
);

Expand All @@ -162,7 +162,7 @@ public GroupAssignment assign(
List<MemberWithRemainingAssignments> potentiallyUnfilledMembers = new ArrayList<>();

for (String memberId : membersForTopic) {
Set<Integer> assignedPartitionsForTopic = assignmentSpec.members().get(memberId)
Set<Integer> assignedPartitionsForTopic = groupSpec.members().get(memberId)
.assignedPartitions().getOrDefault(topicId, Collections.emptySet());

int currentAssignmentSize = assignedPartitionsForTopic.size();
Expand Down
Loading