Skip to content

Commit

Permalink
KAFKA-16864; Optimize uniform (homogenous) assignor (#16088)
Browse files Browse the repository at this point in the history
This patch optimizes uniform (homogenous) assignor by avoiding creating a copy of all the assignments. Instead, the assignor creates a copy only if the assignment is updated. It is a sort of copy-on-write. This change reduces the overhead of the TargetAssignmentBuilder when ran with the uniform (homogenous) assignor.

Trunk:

```
Benchmark                                     (memberCount)  (partitionsToMemberRatio)  (topicCount)  Mode  Cnt   Score   Error  Units
TargetAssignmentBuilderBenchmark.build                10000                         10           100  avgt    5  24.535 ± 1.583  ms/op
TargetAssignmentBuilderBenchmark.build                10000                         10          1000  avgt    5  24.094 ± 0.223  ms/op
JMH benchmarks done
```

```
Benchmark                                       (assignmentType)  (assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  (subscriptionType)  (topicCount)  Mode  Cnt   Score   Error  Units
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10         HOMOGENEOUS           100  avgt    5  14.697 ± 0.133  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10         HOMOGENEOUS          1000  avgt    5  15.073 ± 0.135  ms/op
JMH benchmarks done
```

Patch:

```
Benchmark                                     (memberCount)  (partitionsToMemberRatio)  (topicCount)  Mode  Cnt  Score   Error  Units
TargetAssignmentBuilderBenchmark.build                10000                         10           100  avgt    5  3.376 ± 0.577  ms/op
TargetAssignmentBuilderBenchmark.build                10000                         10          1000  avgt    5  3.731 ± 0.359  ms/op
JMH benchmarks done
```

```
Benchmark                                       (assignmentType)  (assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  (subscriptionType)  (topicCount)  Mode  Cnt  Score   Error  Units
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10         HOMOGENEOUS           100  avgt    5  1.975 ± 0.086  ms/op
ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         UNIFORM          false          10000                         10         HOMOGENEOUS          1000  avgt    5  2.026 ± 0.190  ms/op
JMH benchmarks done
```

Reviewers: Ritika Reddy <[email protected]>, Jeff Kim <[email protected]>, Justine Olshan <[email protected]>
  • Loading branch information
dajac authored May 31, 2024
1 parent ca9f4ae commit fb566e4
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 322 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,19 @@ public GroupAssignment assign(
GroupSpec groupSpec,
SubscribedTopicDescriber subscribedTopicDescriber
) throws PartitionAssignorException {
AbstractUniformAssignmentBuilder assignmentBuilder;

if (groupSpec.members().isEmpty())
return new GroupAssignment(Collections.emptyMap());

if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
LOG.debug("Detected that all members are subscribed to the same set of topics, invoking the "
+ "optimized assignment algorithm");
assignmentBuilder = new OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber);
return new OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber)
.build();
} else {
LOG.debug("Detected that the members are subscribed to different sets of topics, invoking the "
+ "general assignment algorithm");
assignmentBuilder = new GeneralUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber);
return new GeneralUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber)
.buildAssignment();
}

return assignmentBuilder.buildAssignment();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@

import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;

import static org.junit.jupiter.api.Assertions.assertEquals;

Expand All @@ -42,32 +43,32 @@ public static Map.Entry<Uuid, Set<Integer>> mkTopicAssignment(
);
}

public static Map.Entry<Uuid, Set<Integer>> mkSortedTopicAssignment(
public static Map.Entry<Uuid, Set<Integer>> mkOrderedTopicAssignment(
Uuid topicId,
Integer... partitions
) {
return new AbstractMap.SimpleEntry<>(
topicId,
new TreeSet<>(Arrays.asList(partitions))
new LinkedHashSet<>(Arrays.asList(partitions))
);
}

@SafeVarargs
public static Map<Uuid, Set<Integer>> mkAssignment(Map.Entry<Uuid, Set<Integer>>... entries) {
Map<Uuid, Set<Integer>> assignment = new HashMap<>();
for (Map.Entry<Uuid, Set<Integer>> entry : entries) {
assignment.put(entry.getKey(), entry.getValue());
assignment.put(entry.getKey(), Collections.unmodifiableSet(entry.getValue()));
}
return assignment;
return Collections.unmodifiableMap(assignment);
}

@SafeVarargs
public static Map<Uuid, Set<Integer>> mkSortedAssignment(Map.Entry<Uuid, Set<Integer>>... entries) {
public static Map<Uuid, Set<Integer>> mkOrderedAssignment(Map.Entry<Uuid, Set<Integer>>... entries) {
Map<Uuid, Set<Integer>> assignment = new LinkedHashMap<>();
for (Map.Entry<Uuid, Set<Integer>> entry : entries) {
assignment.put(entry.getKey(), entry.getValue());
assignment.put(entry.getKey(), Collections.unmodifiableSet(entry.getValue()));
}
return assignment;
return Collections.unmodifiableMap(assignment);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@
import java.util.stream.Stream;

import static org.apache.kafka.coordinator.group.Assertions.assertRecordEquals;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkSortedAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkSortedTopicAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedTopicAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpers.newCurrentAssignmentRecord;
import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpers.newCurrentAssignmentTombstoneRecord;
Expand Down Expand Up @@ -297,7 +297,7 @@ public void testNewTargetAssignmentRecord() {
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();

Map<Uuid, Set<Integer>> partitions = mkSortedAssignment(
Map<Uuid, Set<Integer>> partitions = mkOrderedAssignment(
mkTopicAssignment(topicId1, 11, 12, 13),
mkTopicAssignment(topicId2, 21, 22, 23)
);
Expand Down Expand Up @@ -379,14 +379,14 @@ public void testNewCurrentAssignmentRecord() {
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();

Map<Uuid, Set<Integer>> assigned = mkSortedAssignment(
mkSortedTopicAssignment(topicId1, 11, 12, 13),
mkSortedTopicAssignment(topicId2, 21, 22, 23)
Map<Uuid, Set<Integer>> assigned = mkOrderedAssignment(
mkOrderedTopicAssignment(topicId1, 11, 12, 13),
mkOrderedTopicAssignment(topicId2, 21, 22, 23)
);

Map<Uuid, Set<Integer>> revoking = mkSortedAssignment(
mkSortedTopicAssignment(topicId1, 14, 15, 16),
mkSortedTopicAssignment(topicId2, 24, 25, 26)
Map<Uuid, Set<Integer>> revoking = mkOrderedAssignment(
mkOrderedTopicAssignment(topicId1, 14, 15, 16),
mkOrderedTopicAssignment(topicId2, 24, 25, 26)
);

CoordinatorRecord expectedRecord = new CoordinatorRecord(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.assertAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment;
import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
Expand Down Expand Up @@ -158,12 +159,11 @@ public void testFirstAssignmentTwoMembersTwoTopicsNoMemberRacks() {

Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 2),
mkTopicAssignment(topic3Uuid, 1)
mkTopicAssignment(topic1Uuid, 0),
mkTopicAssignment(topic3Uuid, 0, 1)
));
expectedAssignment.put(memberB, mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic3Uuid, 0)
mkTopicAssignment(topic1Uuid, 1, 2)
));

GroupSpec groupSpec = new GroupSpecImpl(
Expand Down Expand Up @@ -295,30 +295,25 @@ public void testReassignmentForTwoMembersTwoTopicsGivenUnbalancedPrevAssignment(
));

Map<String, AssignmentMemberSpec> members = new TreeMap<>();
Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(
mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 1),
mkTopicAssignment(topic2Uuid, 0, 1)
)
);

members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForA
mkOrderedAssignment(
mkTopicAssignment(topic1Uuid, 0, 1),
mkTopicAssignment(topic2Uuid, 0, 1)
)
));

Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>(
mkAssignment(
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 2)
)
);
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForB
mkOrderedAssignment(
mkTopicAssignment(topic1Uuid, 2),
mkTopicAssignment(topic2Uuid, 2)
)
));

Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
Expand Down Expand Up @@ -366,40 +361,34 @@ public void testReassignmentWhenPartitionsAreAddedForTwoMembersTwoTopics() {

Map<String, AssignmentMemberSpec> members = new TreeMap<>();

Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(
mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 2),
mkTopicAssignment(topic2Uuid, 0)
)
);
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForA
mkOrderedAssignment(
mkTopicAssignment(topic1Uuid, 0, 2),
mkTopicAssignment(topic2Uuid, 0)
)
));

Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>(
mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1, 2)
)
);
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForB
mkOrderedAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1, 2)
)
));

Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 2, 3, 5),
mkTopicAssignment(topic2Uuid, 0, 4)
mkTopicAssignment(topic1Uuid, 0, 2, 3),
mkTopicAssignment(topic2Uuid, 0, 3, 4)
));
expectedAssignment.put(memberB, mkAssignment(
mkTopicAssignment(topic1Uuid, 1, 4),
mkTopicAssignment(topic2Uuid, 1, 2, 3)
mkTopicAssignment(topic1Uuid, 1, 4, 5),
mkTopicAssignment(topic2Uuid, 1, 2)
));

GroupSpec groupSpec = new GroupSpecImpl(
Expand Down Expand Up @@ -436,26 +425,24 @@ public void testReassignmentWhenOneMemberAddedAfterInitialAssignmentWithTwoMembe

Map<String, AssignmentMemberSpec> members = new HashMap<>();

Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 2),
mkTopicAssignment(topic2Uuid, 0)
));
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForA
mkOrderedAssignment(
mkTopicAssignment(topic1Uuid, 0, 2),
mkTopicAssignment(topic2Uuid, 0)
)
));

Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>(mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1, 2)
));
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForB
mkOrderedAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1, 2)
)
));

// Add a new member to trigger a re-assignment.
Expand Down Expand Up @@ -512,38 +499,36 @@ public void testReassignmentWhenOneMemberRemovedAfterInitialAssignmentWithThreeM

Map<String, AssignmentMemberSpec> members = new HashMap<>();

Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
mkTopicAssignment(topic1Uuid, 0),
mkTopicAssignment(topic2Uuid, 0)
);
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForA
mkAssignment(
mkTopicAssignment(topic1Uuid, 0),
mkTopicAssignment(topic2Uuid, 0)
)
));

Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1)
);
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
mkSet(topic1Uuid, topic2Uuid),
currentAssignmentForB
mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1)
)
));

// Member C was removed

Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 2),
mkTopicAssignment(topic2Uuid, 0)
mkTopicAssignment(topic1Uuid, 0),
mkTopicAssignment(topic2Uuid, 0, 2)
));
expectedAssignment.put(memberB, mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1, 2)
mkTopicAssignment(topic1Uuid, 1, 2),
mkTopicAssignment(topic2Uuid, 1)
));

GroupSpec groupSpec = new GroupSpecImpl(
Expand Down Expand Up @@ -581,26 +566,24 @@ public void testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith
// Initial subscriptions were [T1, T2]
Map<String, AssignmentMemberSpec> members = new HashMap<>();

Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
mkTopicAssignment(topic1Uuid, 0),
mkTopicAssignment(topic2Uuid, 0)
);
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
Collections.singleton(topic2Uuid),
currentAssignmentForA
mkAssignment(
mkTopicAssignment(topic1Uuid, 0),
mkTopicAssignment(topic2Uuid, 0)
)
));

Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1)
);
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
Collections.singleton(topic2Uuid),
currentAssignmentForB
mkAssignment(
mkTopicAssignment(topic1Uuid, 1),
mkTopicAssignment(topic2Uuid, 1)
)
));

Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ private void simulateIncrementalRebalance() {
assignmentMemberSpec.instanceId(),
assignmentMemberSpec.rackId(),
assignmentMemberSpec.subscribedTopicIds(),
memberAssignment.targetPartitions()
Collections.unmodifiableMap(memberAssignment.targetPartitions())
));
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,7 @@ private Map<String, Assignment> generateMockInitialTargetAssignmentAndUpdateInve
for (Map.Entry<String, MemberAssignment> entry : groupAssignment.members().entrySet()) {
String memberId = entry.getKey();
Map<Uuid, Set<Integer>> topicPartitions = entry.getValue().targetPartitions();

Assignment assignment = new Assignment(topicPartitions);

initialTargetAssignment.put(memberId, assignment);
initialTargetAssignment.put(memberId, new Assignment(topicPartitions));
}

return initialTargetAssignment;
Expand Down

0 comments on commit fb566e4

Please sign in to comment.