From c13e7b2a3ed84a395e68ffead9ea518153afbcea Mon Sep 17 00:00:00 2001
From: Lucas Brutschy <lbrutschy@confluent.io>
Date: Thu, 2 Jan 2025 16:31:35 +0100
Subject: [PATCH] KAFKA-18319: Add task assignor interfaces (#18270)

Introduces interfaces for defining task assignors. Task assignors are pure functions, mapping the state of the group and a topology to a target assignment. We include a mock assignor, which we will be able to use when testing / benchmarking without the complexities of the sticky task assignor and the high-availability task assignor. We may remove the mock assignor in before the streams rebalance protocol goes GA.

The consumer groups introduce these interfaces to establish a clear separation between the group coordinator code and the pluggable assignors, which may live outside the group coordinator code. We have removed pluggable assignors in KIP-1071, but I think it still makes sense to keep these interfaces for having a clean interface for people to code against. This will pay off, if we plan on making the task assignors pluggable later.

Reviewers: Bill Bejeck <bbejeck@gmail.com>, David Jacot <djacot@confluent.io>
---
 .../assignor/AssignmentMemberSpec.java        |  60 ++++
 .../streams/assignor/GroupAssignment.java     |  33 +++
 .../group/streams/assignor/GroupSpec.java     |  36 +++
 .../group/streams/assignor/GroupSpecImpl.java |  36 +++
 .../streams/assignor/MemberAssignment.java    |  42 +++
 .../group/streams/assignor/MockAssignor.java  | 106 +++++++
 .../group/streams/assignor/TaskAssignor.java  |  41 +++
 .../assignor/TaskAssignorException.java       |  33 +++
 .../group/streams/assignor/TaskId.java        |  41 +++
 .../streams/assignor/TopologyDescriber.java   |  50 ++++
 .../streams/assignor/GroupSpecImplTest.java   |  62 ++++
 .../streams/assignor/MockAssignorTest.java    | 267 ++++++++++++++++++
 12 files changed, 807 insertions(+)
 create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/AssignmentMemberSpec.java
 create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupAssignment.java
 create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupSpec.java
 create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupSpecImpl.java
 create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MemberAssignment.java
 create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignor.java
 create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TaskAssignor.java
 create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TaskAssignorException.java
 create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TaskId.java
 create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TopologyDescriber.java
 create mode 100644 group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/GroupSpecImplTest.java
 create mode 100644 group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignorTest.java

diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/AssignmentMemberSpec.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/AssignmentMemberSpec.java
new file mode 100644
index 0000000000000..99953b09d7159
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/AssignmentMemberSpec.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * The assignment specification for a Streams group member.
+ *
+ * @param instanceId   The instance ID if provided.
+ * @param rackId       The rack ID if provided.
+ * @param activeTasks  Reconciled active tasks
+ * @param standbyTasks Reconciled standby tasks
+ * @param warmupTasks  Reconciled warm-up tasks
+ * @param processId    The process ID.
+ * @param clientTags   The client tags for a rack-aware assignment.
+ * @param taskOffsets  The last received cumulative task offsets of assigned tasks or dormant tasks.
+ */
+public record AssignmentMemberSpec(Optional<String> instanceId,
+                                   Optional<String> rackId,
+                                   Map<String, Set<Integer>> activeTasks,
+                                   Map<String, Set<Integer>> standbyTasks,
+                                   Map<String, Set<Integer>> warmupTasks,
+                                   String processId,
+                                   Map<String, String> clientTags,
+                                   Map<TaskId, Long> taskOffsets,
+                                   Map<TaskId, Long> taskEndOffsets
+) {
+
+    public AssignmentMemberSpec {
+        Objects.requireNonNull(instanceId);
+        Objects.requireNonNull(rackId);
+        activeTasks = Collections.unmodifiableMap(Objects.requireNonNull(activeTasks));
+        standbyTasks = Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks));
+        warmupTasks = Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks));
+        Objects.requireNonNull(processId);
+        clientTags = Collections.unmodifiableMap(Objects.requireNonNull(clientTags));
+        taskOffsets = Collections.unmodifiableMap(Objects.requireNonNull(taskOffsets));
+        taskEndOffsets = Collections.unmodifiableMap(Objects.requireNonNull(taskEndOffsets));
+    }
+
+}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupAssignment.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupAssignment.java
new file mode 100644
index 0000000000000..a97cdc33b7909
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupAssignment.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The task assignment for a streams group.
+ *
+ * @param members The member assignments keyed by member id.
+ */
+public record GroupAssignment(Map<String, MemberAssignment> members) {
+
+    public GroupAssignment {
+        Objects.requireNonNull(members);
+    }
+
+}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupSpec.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupSpec.java
new file mode 100644
index 0000000000000..1a8e7edc01c63
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupSpec.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+import java.util.Map;
+
+/**
+ * The group metadata specifications required to compute the target assignment.
+ */
+public interface GroupSpec {
+
+    /**
+     * @return Member metadata keyed by member Id.
+     */
+    Map<String, AssignmentMemberSpec> members();
+
+    /**
+     * @return Any configurations passed to the assignor.
+     */
+    Map<String, String> assignmentConfigs();
+
+}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupSpecImpl.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupSpecImpl.java
new file mode 100644
index 0000000000000..caa82ed2cb21c
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupSpecImpl.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The assignment specification for a Streams group.
+ *
+ * @param members           The member metadata keyed by member ID.
+ * @param assignmentConfigs Any configurations passed to the assignor.
+ */
+public record GroupSpecImpl(Map<String, AssignmentMemberSpec> members,
+                            Map<String, String> assignmentConfigs) implements GroupSpec {
+
+    public GroupSpecImpl {
+        Objects.requireNonNull(members);
+        Objects.requireNonNull(assignmentConfigs);
+    }
+
+}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MemberAssignment.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MemberAssignment.java
new file mode 100644
index 0000000000000..22f908b825c52
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MemberAssignment.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * The task assignment for a Streams group member.
+ *
+ * @param activeTasks The target tasks assigned to this member keyed by subtopologyId.
+ */
+public record MemberAssignment(Map<String, Set<Integer>> activeTasks,
+                               Map<String, Set<Integer>> standbyTasks,
+                               Map<String, Set<Integer>> warmupTasks) {
+
+    public MemberAssignment {
+        Objects.requireNonNull(activeTasks);
+        Objects.requireNonNull(standbyTasks);
+        Objects.requireNonNull(warmupTasks);
+    }
+
+    public static MemberAssignment empty() {
+        return new MemberAssignment(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
+    }
+}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignor.java
new file mode 100644
index 0000000000000..ce0bc101101ec
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignor.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Mock implementation of {@link TaskAssignor} that assigns tasks to members in a round-robin fashion, with a bit of stickiness.
+ */
+public class MockAssignor implements TaskAssignor {
+
+    public static final String MOCK_ASSIGNOR_NAME = "mock";
+
+    @Override
+    public String name() {
+        return MOCK_ASSIGNOR_NAME;
+    }
+
+    @Override
+    public GroupAssignment assign(
+        final GroupSpec groupSpec,
+        final TopologyDescriber topologyDescriber
+    ) throws TaskAssignorException {
+
+        Map<String, MemberAssignment> newTargetAssignment = new HashMap<>();
+        Map<String, String[]> subtopologyToActiveMember = new HashMap<>();
+
+        for (String subtopology : topologyDescriber.subtopologies()) {
+            int numberOfPartitions = topologyDescriber.numTasks(subtopology);
+            subtopologyToActiveMember.put(subtopology, new String[numberOfPartitions]);
+        }
+
+        // Copy existing assignment and fill temporary data structures
+        for (Map.Entry<String, AssignmentMemberSpec> memberEntry : groupSpec.members().entrySet()) {
+            final String memberId = memberEntry.getKey();
+            final AssignmentMemberSpec memberSpec = memberEntry.getValue();
+
+            Map<String, Set<Integer>> activeTasks = new HashMap<>(memberSpec.activeTasks());
+
+            newTargetAssignment.put(memberId, new MemberAssignment(activeTasks, new HashMap<>(), new HashMap<>()));
+            for (Map.Entry<String, Set<Integer>> entry : activeTasks.entrySet()) {
+                final String subtopologyId = entry.getKey();
+                final Set<Integer> taskIds = entry.getValue();
+                final String[] activeMembers = subtopologyToActiveMember.get(subtopologyId);
+                for (int taskId : taskIds) {
+                    if (activeMembers[taskId] != null) {
+                        throw new TaskAssignorException(
+                            "Task " + taskId + " of subtopology " + subtopologyId + " is assigned to multiple members");
+                    }
+                    activeMembers[taskId] = memberId;
+                }
+            }
+        }
+
+        // Define priority queue to sort members by task count
+        PriorityQueue<MemberAndTaskCount> memberAndTaskCount = new PriorityQueue<>(Comparator.comparingInt(m -> m.taskCount));
+        memberAndTaskCount.addAll(
+            newTargetAssignment.keySet().stream()
+                .map(memberId -> new MemberAndTaskCount(memberId,
+                    newTargetAssignment.get(memberId).activeTasks().values().stream().mapToInt(Set::size).sum()))
+                .collect(Collectors.toSet())
+        );
+
+        // Assign unassigned tasks to members with the fewest tasks
+        for (Map.Entry<String, String[]> entry : subtopologyToActiveMember.entrySet()) {
+            final String subtopologyId = entry.getKey();
+            final String[] activeMembers = entry.getValue();
+            for (int i = 0; i < activeMembers.length; i++) {
+                if (activeMembers[i] == null) {
+                    final MemberAndTaskCount m = memberAndTaskCount.poll();
+                    if (m == null) {
+                        throw new TaskAssignorException("No member available to assign task " + i + " of subtopology " + subtopologyId);
+                    }
+                    newTargetAssignment.get(m.memberId).activeTasks().computeIfAbsent(subtopologyId, k -> new HashSet<>()).add(i);
+                    activeMembers[i] = m.memberId;
+                    memberAndTaskCount.add(new MemberAndTaskCount(m.memberId, m.taskCount + 1));
+                }
+            }
+        }
+
+        return new GroupAssignment(newTargetAssignment);
+    }
+
+    private record MemberAndTaskCount(String memberId, int taskCount) {
+    }
+}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TaskAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TaskAssignor.java
new file mode 100644
index 0000000000000..7b4874a9bf88e
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TaskAssignor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+/**
+ * Server side task assignor used by streams groups.
+ */
+public interface TaskAssignor {
+
+    /**
+     * Unique name for this assignor.
+     */
+    String name();
+
+    /**
+     * Assigns tasks to group members based on the given assignment specification and topic metadata.
+     *
+     * @param groupSpec         The assignment spec which includes member metadata.
+     * @param topologyDescriber The task metadata describer.
+     * @return The new assignment for the group.
+     */
+    GroupAssignment assign(
+        GroupSpec groupSpec,
+        TopologyDescriber topologyDescriber
+    ) throws TaskAssignorException;
+
+}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TaskAssignorException.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TaskAssignorException.java
new file mode 100644
index 0000000000000..2fda6a9e9ec62
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TaskAssignorException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+import org.apache.kafka.common.errors.ApiException;
+
+/**
+ * Exception thrown by {@link TaskAssignor#assign(GroupSpec, TopologyDescriber)}}. The exception is only used internally.
+ */
+public class TaskAssignorException extends ApiException {
+
+    public TaskAssignorException(String message) {
+        super(message);
+    }
+
+    public TaskAssignorException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TaskId.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TaskId.java
new file mode 100644
index 0000000000000..3a9e594a82bf7
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TaskId.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+import java.util.Comparator;
+import java.util.Objects;
+
+/**
+ * The identifier for a task
+ *
+ * @param subtopologyId The unique identifier of the subtopology.
+ * @param partition     The partition of the input topics this task is processing.
+ */
+public record TaskId(String subtopologyId, int partition) implements Comparable<TaskId> {
+
+    public TaskId {
+        Objects.requireNonNull(subtopologyId);
+    }
+
+    @Override
+    public int compareTo(final TaskId other) {
+        return Comparator.comparing(TaskId::subtopologyId)
+            .thenComparingInt(TaskId::partition)
+            .compare(this, other);
+    }
+
+}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TopologyDescriber.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TopologyDescriber.java
new file mode 100644
index 0000000000000..2f913bf5514e8
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TopologyDescriber.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * The subscribed topic describer is used by the {@link TaskAssignor} to obtain topic and task metadata of the groups topology.
+ */
+public interface TopologyDescriber {
+
+    /**
+     * @return The list of subtopologies IDs.
+     */
+    List<String> subtopologies();
+
+    /**
+     * The number of tasks for the given subtopology.
+     *
+     * @param subtopologyId String identifying the subtopology.
+     *
+     * @return The number of tasks corresponding to the given subtopology ID.
+     * @throws NoSuchElementException if subtopology does not exist in the topology.
+     */
+    int numTasks(String subtopologyId) throws NoSuchElementException;
+
+    /**
+     * Whether the given subtopology is stateful.
+     *
+     * @param subtopologyId String identifying the subtopology.
+     * @return true if the subtopology is stateful, false otherwise.
+     */
+    boolean isStateful(String subtopologyId);
+
+}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/GroupSpecImplTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/GroupSpecImplTest.java
new file mode 100644
index 0000000000000..5deccb9717f17
--- /dev/null
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/GroupSpecImplTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+public class GroupSpecImplTest {
+
+    private Map<String, AssignmentMemberSpec> members;
+    private GroupSpecImpl groupSpec;
+
+    @BeforeEach
+    void setUp() {
+        members = new HashMap<>();
+
+        members.put("test-member", new AssignmentMemberSpec(
+            Optional.of("test-instance"),
+            Optional.of("test-rack"),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            "test-process",
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap()
+        ));
+
+        groupSpec = new GroupSpecImpl(
+            members,
+            new HashMap<>()
+        );
+    }
+
+    @Test
+    void testMembers() {
+        assertEquals(members, groupSpec.members());
+    }
+
+}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignorTest.java
new file mode 100644
index 0000000000000..25dada072df13
--- /dev/null
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignorTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.assignor;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+public class MockAssignorTest {
+
+    private final MockAssignor assignor = new MockAssignor();
+
+    @Test
+    public void testZeroMembers() {
+
+        TaskAssignorException ex = assertThrows(TaskAssignorException.class, () -> assignor.assign(
+            new GroupSpecImpl(
+                Collections.emptyMap(),
+                new HashMap<>()
+            ),
+            new TopologyDescriberImpl(5, Collections.singletonList("test-subtopology"))
+        ));
+
+        assertEquals("No member available to assign task 0 of subtopology test-subtopology", ex.getMessage());
+    }
+
+    @Test
+    public void testDoubleAssignment() {
+
+        final AssignmentMemberSpec memberSpec1 = new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            Collections.singletonMap("test-subtopology", new HashSet<>(List.of(0))),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            "test-process",
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap()
+        );
+
+        final AssignmentMemberSpec memberSpec2 = new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            Collections.singletonMap("test-subtopology", new HashSet<>(List.of(0))),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            "test-process",
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap()
+        );
+
+        TaskAssignorException ex = assertThrows(TaskAssignorException.class, () -> assignor.assign(
+            new GroupSpecImpl(
+                Map.of("member1", memberSpec1, "member2", memberSpec2),
+                new HashMap<>()
+            ),
+            new TopologyDescriberImpl(5, Collections.singletonList("test-subtopology"))
+        ));
+
+        assertEquals("Task 0 of subtopology test-subtopology is assigned to multiple members", ex.getMessage());
+    }
+
+    @Test
+    public void testBasicScenario() {
+
+        final GroupAssignment result = assignor.assign(
+            new GroupSpecImpl(
+                Collections.emptyMap(),
+                new HashMap<>()
+            ),
+            new TopologyDescriberImpl(5, Collections.emptyList())
+        );
+
+        assertEquals(0, result.members().size());
+    }
+
+
+    @Test
+    public void testSingleMember() {
+
+        final AssignmentMemberSpec memberSpec = new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            "test-process",
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap()
+        );
+
+        final GroupAssignment result = assignor.assign(
+            new GroupSpecImpl(
+                Collections.singletonMap("test_member", memberSpec),
+                new HashMap<>()
+            ),
+            new TopologyDescriberImpl(4, List.of("test-subtopology"))
+        );
+
+        assertEquals(1, result.members().size());
+        final MemberAssignment testMember = result.members().get("test_member");
+        assertNotNull(testMember);
+        assertEquals(mkMap(
+            mkEntry("test-subtopology", Set.of(0, 1, 2, 3))
+        ), testMember.activeTasks());
+    }
+
+
+    @Test
+    public void testTwoMembersTwoSubtopologies() {
+
+        final AssignmentMemberSpec memberSpec1 = new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            "test-process",
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap()
+        );
+
+        final AssignmentMemberSpec memberSpec2 = new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            "test-process",
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap()
+        );
+
+        final GroupAssignment result = assignor.assign(
+            new GroupSpecImpl(
+                mkMap(mkEntry("test_member1", memberSpec1), mkEntry("test_member2", memberSpec2)),
+                new HashMap<>()
+            ),
+            new TopologyDescriberImpl(4, List.of("test-subtopology1", "test-subtopology2"))
+        );
+
+        final Map<String, Set<Integer>> expected1 = mkMap(
+            mkEntry("test-subtopology1", Set.of(1, 3)),
+            mkEntry("test-subtopology2", Set.of(1, 3))
+        );
+        final Map<String, Set<Integer>> expected2 = mkMap(
+            mkEntry("test-subtopology1", Set.of(0, 2)),
+            mkEntry("test-subtopology2", Set.of(0, 2))
+        );
+
+        assertEquals(2, result.members().size());
+        final MemberAssignment testMember1 = result.members().get("test_member1");
+        final MemberAssignment testMember2 = result.members().get("test_member2");
+        assertNotNull(testMember1);
+        assertNotNull(testMember2);
+        assertTrue(expected1.equals(testMember1.activeTasks()) || expected2.equals(testMember1.activeTasks()));
+        assertTrue(expected1.equals(testMember2.activeTasks()) || expected2.equals(testMember2.activeTasks()));
+    }
+
+    @Test
+    public void testTwoMembersTwoSubtopologiesStickiness() {
+
+        final AssignmentMemberSpec memberSpec1 = new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            mkMap(
+                mkEntry("test-subtopology1", new HashSet<>(List.of(0, 2, 3))),
+                mkEntry("test-subtopology2", new HashSet<>(List.of(0)))
+            ),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            "test-process",
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap()
+        );
+
+        final AssignmentMemberSpec memberSpec2 = new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            mkMap(
+                mkEntry("test-subtopology1", new HashSet<>(List.of(1))),
+                mkEntry("test-subtopology2", new HashSet<>(List.of(3)))
+            ),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            "test-process",
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap()
+        );
+        final GroupAssignment result = assignor.assign(
+            new GroupSpecImpl(
+                mkMap(mkEntry("test_member1", memberSpec1), mkEntry("test_member2", memberSpec2)),
+                new HashMap<>()
+            ),
+            new TopologyDescriberImpl(4, List.of("test-subtopology1", "test-subtopology2"))
+        );
+
+        assertEquals(2, result.members().size());
+        final MemberAssignment testMember1 = result.members().get("test_member1");
+        final MemberAssignment testMember2 = result.members().get("test_member2");
+        assertNotNull(testMember1);
+        assertNotNull(testMember2);
+        assertEquals(mkMap(
+            mkEntry("test-subtopology1", Set.of(0, 2, 3)),
+            mkEntry("test-subtopology2", Set.of(0))
+        ), testMember1.activeTasks());
+        assertEquals(mkMap(
+            mkEntry("test-subtopology1", Set.of(1)),
+            mkEntry("test-subtopology2", Set.of(1, 2, 3))
+        ), testMember2.activeTasks());
+    }
+
+    private record TopologyDescriberImpl(int numPartitions, List<String> subtopologies) implements TopologyDescriber {
+
+        @Override
+        public List<String> subtopologies() {
+            return subtopologies;
+        }
+
+        @Override
+        public int numTasks(String subtopologyId) {
+            return numPartitions;
+        }
+
+        @Override
+        public boolean isStateful(String subtopologyId) {
+            return false;
+        }
+
+    }
+
+}