Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18319: Add task assignor interfaces #18270

Merged
merged 4 commits into from
Jan 2, 2025

Conversation

lucasbru
Copy link
Member

@lucasbru lucasbru commented Dec 19, 2024

Introduces interfaces for defining task assignors. Task assignors are pure functions, mapping the state of the group and a topology to a target assignment. We include a mock assignor, which we will be able to use when testing / benchmarking without the complexities of the sticky task assignor and the high-availability task assignor. We may remove the mock assignor in before the streams rebalance protocol goes GA.

The consumer groups introduce these interfaces to establish a clear separation between the group coordinator code and the pluggable assignors, which may live outside the group coordinator code. We have removed pluggable assignors in KIP-1071, but I think it still makes sense to keep these interfaces for having a clean interface for people to code against. This will pay off, if we plan on making the task assignors pluggable later.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.taskassignor;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could consider moving this to the streams package.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to put all the streams related classes in a streams package. Also, the public API will have to move to the group-coordinator-api module at some point.

* @param clientTags The client tags for a rack-aware assignment.
* @param taskOffsets The last received cumulative task offsets of assigned tasks or dormant tasks.
*/
public record AssignmentMemberSpec(Optional<String> instanceId,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compared to the feature branch, I switched to Java 17 records, which compresses the code a bit.

/**
* The group metadata specifications required to compute the target assignment.
*/
public interface GroupSpec {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The consumer groups introduce these interfaces to establish a clear separation between the group coordinator code and the pluggable assignors, which may live outside the group coordinator code. We haven't included pluggable assignors in KIP-1071, but I think it still makes sense to keep these interfaces for having a clean interface for people to code against. This will pay off, if we plan on making the task assignors pluggable later .

/**
* Mock implementation of {@link TaskAssignor} that assigns tasks to members in a round-robin fashion, with a bit of stickiness.
*/
public class MockAssignor implements TaskAssignor {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I included the mock assignor, since it might make sense to use it, if we want to exclude the complexities of the sticky assignor and use a dead-simple assignor instead. We may or may not remove it from the code before 4.1.

@cadonna cadonna added streams KIP-1071 PRs related to KIP-1071 core Kafka Broker labels Dec 19, 2024
Copy link
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @lucasbru - overall LGTM with a few minor comments for your considerations.

Comment on lines 67 to 68
throw new TaskAssignorException(
"Task " + taskId + " of subtopology " + subtopologyId + " is assigned to multiple members.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This condition isn't covered in the test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +91 to +92
if (m == null) {
throw new TaskAssignorException("No member available to assign task " + i + " of subtopology " + subtopologyId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This condition isn't covered in the test

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

/**
* @return Any configurations passed to the assignor.
*/
Map<String, String> assignmentConfigs();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: assignmentConfigs() isn't covered in the test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only an interface. The mockassingor does not use assignment confics, so it's not used in the test. This will be tested when the interface gets a production implementation (sticky assignor).

* @param subtopologyId String identifying the subtopology.
* @return true if the subtopology is stateful, false otherwise.
*/
boolean isStateful(String subtopologyId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: isStateful isn't covered in the tests

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only an interface. The mockassingor does not distinguish between stateless and stateful tasks, so it's not used in the test. This will be tested when the interface gets a production implementation.

@lucasbru lucasbru requested a review from bbejeck January 2, 2025 12:18
@lucasbru lucasbru merged commit dd0fd55 into apache:trunk Jan 2, 2025
9 checks passed
airlock-confluentinc bot pushed a commit to confluentinc/kafka that referenced this pull request Jan 3, 2025
Introduces interfaces for defining task assignors. Task assignors are pure functions, mapping the state of the group and a topology to a target assignment. We include a mock assignor, which we will be able to use when testing / benchmarking without the complexities of the sticky task assignor and the high-availability task assignor. We may remove the mock assignor in before the streams rebalance protocol goes GA.

The consumer groups introduce these interfaces to establish a clear separation between the group coordinator code and the pluggable assignors, which may live outside the group coordinator code. We have removed pluggable assignors in KIP-1071, but I think it still makes sense to keep these interfaces for having a clean interface for people to code against. This will pay off, if we plan on making the task assignors pluggable later.

Reviewers: Bill Bejeck <[email protected]>, David Jacot <[email protected]>
tedyu pushed a commit to tedyu/kafka that referenced this pull request Jan 6, 2025
Introduces interfaces for defining task assignors. Task assignors are pure functions, mapping the state of the group and a topology to a target assignment. We include a mock assignor, which we will be able to use when testing / benchmarking without the complexities of the sticky task assignor and the high-availability task assignor. We may remove the mock assignor in before the streams rebalance protocol goes GA.

The consumer groups introduce these interfaces to establish a clear separation between the group coordinator code and the pluggable assignors, which may live outside the group coordinator code. We have removed pluggable assignors in KIP-1071, but I think it still makes sense to keep these interfaces for having a clean interface for people to code against. This will pay off, if we plan on making the task assignors pluggable later.

Reviewers: Bill Bejeck <[email protected]>, David Jacot <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Kafka Broker KIP-1071 PRs related to KIP-1071 streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants