forked from apache/kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
KAFKA-18319: Add task assignor interfaces (apache#18270)
Introduces interfaces for defining task assignors. Task assignors are pure functions, mapping the state of the group and a topology to a target assignment. We include a mock assignor, which we will be able to use when testing / benchmarking without the complexities of the sticky task assignor and the high-availability task assignor. We may remove the mock assignor in before the streams rebalance protocol goes GA. The consumer groups introduce these interfaces to establish a clear separation between the group coordinator code and the pluggable assignors, which may live outside the group coordinator code. We have removed pluggable assignors in KIP-1071, but I think it still makes sense to keep these interfaces for having a clean interface for people to code against. This will pay off, if we plan on making the task assignors pluggable later. Reviewers: Bill Bejeck <[email protected]>, David Jacot <[email protected]>
- Loading branch information
Showing
12 changed files
with
807 additions
and
0 deletions.
There are no files selected for viewing
60 changes: 60 additions & 0 deletions
60
...c/main/java/org/apache/kafka/coordinator/group/streams/assignor/AssignmentMemberSpec.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.kafka.coordinator.group.streams.assignor; | ||
|
||
import java.util.Collections; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
|
||
/** | ||
* The assignment specification for a Streams group member. | ||
* | ||
* @param instanceId The instance ID if provided. | ||
* @param rackId The rack ID if provided. | ||
* @param activeTasks Reconciled active tasks | ||
* @param standbyTasks Reconciled standby tasks | ||
* @param warmupTasks Reconciled warm-up tasks | ||
* @param processId The process ID. | ||
* @param clientTags The client tags for a rack-aware assignment. | ||
* @param taskOffsets The last received cumulative task offsets of assigned tasks or dormant tasks. | ||
*/ | ||
public record AssignmentMemberSpec(Optional<String> instanceId, | ||
Optional<String> rackId, | ||
Map<String, Set<Integer>> activeTasks, | ||
Map<String, Set<Integer>> standbyTasks, | ||
Map<String, Set<Integer>> warmupTasks, | ||
String processId, | ||
Map<String, String> clientTags, | ||
Map<TaskId, Long> taskOffsets, | ||
Map<TaskId, Long> taskEndOffsets | ||
) { | ||
|
||
public AssignmentMemberSpec { | ||
Objects.requireNonNull(instanceId); | ||
Objects.requireNonNull(rackId); | ||
activeTasks = Collections.unmodifiableMap(Objects.requireNonNull(activeTasks)); | ||
standbyTasks = Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks)); | ||
warmupTasks = Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks)); | ||
Objects.requireNonNull(processId); | ||
clientTags = Collections.unmodifiableMap(Objects.requireNonNull(clientTags)); | ||
taskOffsets = Collections.unmodifiableMap(Objects.requireNonNull(taskOffsets)); | ||
taskEndOffsets = Collections.unmodifiableMap(Objects.requireNonNull(taskEndOffsets)); | ||
} | ||
|
||
} |
33 changes: 33 additions & 0 deletions
33
...or/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupAssignment.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.kafka.coordinator.group.streams.assignor; | ||
|
||
import java.util.Map; | ||
import java.util.Objects; | ||
|
||
/** | ||
* The task assignment for a streams group. | ||
* | ||
* @param members The member assignments keyed by member id. | ||
*/ | ||
public record GroupAssignment(Map<String, MemberAssignment> members) { | ||
|
||
public GroupAssignment { | ||
Objects.requireNonNull(members); | ||
} | ||
|
||
} |
36 changes: 36 additions & 0 deletions
36
...rdinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupSpec.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.kafka.coordinator.group.streams.assignor; | ||
|
||
import java.util.Map; | ||
|
||
/** | ||
* The group metadata specifications required to compute the target assignment. | ||
*/ | ||
public interface GroupSpec { | ||
|
||
/** | ||
* @return Member metadata keyed by member Id. | ||
*/ | ||
Map<String, AssignmentMemberSpec> members(); | ||
|
||
/** | ||
* @return Any configurations passed to the assignor. | ||
*/ | ||
Map<String, String> assignmentConfigs(); | ||
|
||
} |
36 changes: 36 additions & 0 deletions
36
...ator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupSpecImpl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.kafka.coordinator.group.streams.assignor; | ||
|
||
import java.util.Map; | ||
import java.util.Objects; | ||
|
||
/** | ||
* The assignment specification for a Streams group. | ||
* | ||
* @param members The member metadata keyed by member ID. | ||
* @param assignmentConfigs Any configurations passed to the assignor. | ||
*/ | ||
public record GroupSpecImpl(Map<String, AssignmentMemberSpec> members, | ||
Map<String, String> assignmentConfigs) implements GroupSpec { | ||
|
||
public GroupSpecImpl { | ||
Objects.requireNonNull(members); | ||
Objects.requireNonNull(assignmentConfigs); | ||
} | ||
|
||
} |
42 changes: 42 additions & 0 deletions
42
...r/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MemberAssignment.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.kafka.coordinator.group.streams.assignor; | ||
|
||
import java.util.Collections; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.Set; | ||
|
||
/** | ||
* The task assignment for a Streams group member. | ||
* | ||
* @param activeTasks The target tasks assigned to this member keyed by subtopologyId. | ||
*/ | ||
public record MemberAssignment(Map<String, Set<Integer>> activeTasks, | ||
Map<String, Set<Integer>> standbyTasks, | ||
Map<String, Set<Integer>> warmupTasks) { | ||
|
||
public MemberAssignment { | ||
Objects.requireNonNull(activeTasks); | ||
Objects.requireNonNull(standbyTasks); | ||
Objects.requireNonNull(warmupTasks); | ||
} | ||
|
||
public static MemberAssignment empty() { | ||
return new MemberAssignment(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); | ||
} | ||
} |
106 changes: 106 additions & 0 deletions
106
...nator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.kafka.coordinator.group.streams.assignor; | ||
|
||
import java.util.Comparator; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.Map; | ||
import java.util.PriorityQueue; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
|
||
/** | ||
* Mock implementation of {@link TaskAssignor} that assigns tasks to members in a round-robin fashion, with a bit of stickiness. | ||
*/ | ||
public class MockAssignor implements TaskAssignor { | ||
|
||
public static final String MOCK_ASSIGNOR_NAME = "mock"; | ||
|
||
@Override | ||
public String name() { | ||
return MOCK_ASSIGNOR_NAME; | ||
} | ||
|
||
@Override | ||
public GroupAssignment assign( | ||
final GroupSpec groupSpec, | ||
final TopologyDescriber topologyDescriber | ||
) throws TaskAssignorException { | ||
|
||
Map<String, MemberAssignment> newTargetAssignment = new HashMap<>(); | ||
Map<String, String[]> subtopologyToActiveMember = new HashMap<>(); | ||
|
||
for (String subtopology : topologyDescriber.subtopologies()) { | ||
int numberOfPartitions = topologyDescriber.numTasks(subtopology); | ||
subtopologyToActiveMember.put(subtopology, new String[numberOfPartitions]); | ||
} | ||
|
||
// Copy existing assignment and fill temporary data structures | ||
for (Map.Entry<String, AssignmentMemberSpec> memberEntry : groupSpec.members().entrySet()) { | ||
final String memberId = memberEntry.getKey(); | ||
final AssignmentMemberSpec memberSpec = memberEntry.getValue(); | ||
|
||
Map<String, Set<Integer>> activeTasks = new HashMap<>(memberSpec.activeTasks()); | ||
|
||
newTargetAssignment.put(memberId, new MemberAssignment(activeTasks, new HashMap<>(), new HashMap<>())); | ||
for (Map.Entry<String, Set<Integer>> entry : activeTasks.entrySet()) { | ||
final String subtopologyId = entry.getKey(); | ||
final Set<Integer> taskIds = entry.getValue(); | ||
final String[] activeMembers = subtopologyToActiveMember.get(subtopologyId); | ||
for (int taskId : taskIds) { | ||
if (activeMembers[taskId] != null) { | ||
throw new TaskAssignorException( | ||
"Task " + taskId + " of subtopology " + subtopologyId + " is assigned to multiple members"); | ||
} | ||
activeMembers[taskId] = memberId; | ||
} | ||
} | ||
} | ||
|
||
// Define priority queue to sort members by task count | ||
PriorityQueue<MemberAndTaskCount> memberAndTaskCount = new PriorityQueue<>(Comparator.comparingInt(m -> m.taskCount)); | ||
memberAndTaskCount.addAll( | ||
newTargetAssignment.keySet().stream() | ||
.map(memberId -> new MemberAndTaskCount(memberId, | ||
newTargetAssignment.get(memberId).activeTasks().values().stream().mapToInt(Set::size).sum())) | ||
.collect(Collectors.toSet()) | ||
); | ||
|
||
// Assign unassigned tasks to members with the fewest tasks | ||
for (Map.Entry<String, String[]> entry : subtopologyToActiveMember.entrySet()) { | ||
final String subtopologyId = entry.getKey(); | ||
final String[] activeMembers = entry.getValue(); | ||
for (int i = 0; i < activeMembers.length; i++) { | ||
if (activeMembers[i] == null) { | ||
final MemberAndTaskCount m = memberAndTaskCount.poll(); | ||
if (m == null) { | ||
throw new TaskAssignorException("No member available to assign task " + i + " of subtopology " + subtopologyId); | ||
} | ||
newTargetAssignment.get(m.memberId).activeTasks().computeIfAbsent(subtopologyId, k -> new HashSet<>()).add(i); | ||
activeMembers[i] = m.memberId; | ||
memberAndTaskCount.add(new MemberAndTaskCount(m.memberId, m.taskCount + 1)); | ||
} | ||
} | ||
} | ||
|
||
return new GroupAssignment(newTargetAssignment); | ||
} | ||
|
||
private record MemberAndTaskCount(String memberId, int taskCount) { | ||
} | ||
} |
41 changes: 41 additions & 0 deletions
41
...nator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TaskAssignor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.kafka.coordinator.group.streams.assignor; | ||
|
||
/** | ||
* Server side task assignor used by streams groups. | ||
*/ | ||
public interface TaskAssignor { | ||
|
||
/** | ||
* Unique name for this assignor. | ||
*/ | ||
String name(); | ||
|
||
/** | ||
* Assigns tasks to group members based on the given assignment specification and topic metadata. | ||
* | ||
* @param groupSpec The assignment spec which includes member metadata. | ||
* @param topologyDescriber The task metadata describer. | ||
* @return The new assignment for the group. | ||
*/ | ||
GroupAssignment assign( | ||
GroupSpec groupSpec, | ||
TopologyDescriber topologyDescriber | ||
) throws TaskAssignorException; | ||
|
||
} |
33 changes: 33 additions & 0 deletions
33
.../main/java/org/apache/kafka/coordinator/group/streams/assignor/TaskAssignorException.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.kafka.coordinator.group.streams.assignor; | ||
|
||
import org.apache.kafka.common.errors.ApiException; | ||
|
||
/** | ||
* Exception thrown by {@link TaskAssignor#assign(GroupSpec, TopologyDescriber)}}. The exception is only used internally. | ||
*/ | ||
public class TaskAssignorException extends ApiException { | ||
|
||
public TaskAssignorException(String message) { | ||
super(message); | ||
} | ||
|
||
public TaskAssignorException(String message, Throwable cause) { | ||
super(message, cause); | ||
} | ||
} |
Oops, something went wrong.