Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14462; [18/N] Add GroupCoordinatorService #13812

Merged
merged 5 commits into from
Jun 22, 2023

Conversation

dajac
Copy link
Member

@dajac dajac commented Jun 5, 2023

This patch introduces the GroupCoordinatorService. This is the new implementation of the group coordinator based on the coordinator runtime introduced in #13795.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@dajac dajac added the KIP-848 The Next Generation of the Consumer Rebalance Protocol label Jun 5, 2023
@jolshan
Copy link
Member

jolshan commented Jun 5, 2023

@dajac Can you share the names of the new files while this contains all the other changes? Is it just GroupCoordinatorService.java (+ tests)

@dajac dajac force-pushed the KAFKA-14462-18 branch from 49bec93 to 03d1208 Compare June 6, 2023 06:51
@dajac
Copy link
Member Author

dajac commented Jun 6, 2023

@jolshan The new files are:

  • GroupCoordinatorService.java + tests
  • GroupCoordinatorConfig.java + tests
  • ReplicatedGroupCoordinator.java + tests

@dajac dajac force-pushed the KAFKA-14462-18 branch 2 times, most recently from 01f4cf5 to 8a1968e Compare June 6, 2023 14:28
* 2) The replay methods which apply records to the hard state. Those are used in the request
* handling as well as during the initial loading of the records from the partitions.
*/
public class ReplicatedGroupCoordinator implements Coordinator<Record> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this called replicated because we replicate the state? (In other words, this is the implementation to get the hard state we already have for the current group coordinator)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. This is where the state is stored. I don't really like the name but I could not come up with a better one. I am opened to suggestions here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it depends on what the other implementations of coordinator will be 😅

*/
public class GroupCoordinatorConfig {
public static class Builder {
private int numThreads = 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please move these to DEFAULT constants? and use ConfigDef?

Please see RemoteLogManagerConfig for inspiration.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting pattern used in RemoteLogManagerConfig. As we define the properties in the ConfigDef in that class and not in the ConfigDef used in KafkaConfig, all the remote storage properties are not documented in the documentation. The documentation is generated based on the ConfigDef in KafkaConfig. I suppose that we missed this... We use a similar pattern for RaftConfig but in this case we defined the properties in two places. I am not a fan of this because it is error prone. It would be better if we could somehow add a ConfigDef to another ConfigDef to make this automatic and transparent. I will play a bit with this...

In this case, all the properties of the group coordinator and their default values are already defined in KafkaConfig and I just wanted to have a container to move them around in the java module.

However, I agree with the constants part of your comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have played a bit around with this idea. It create a pretty big diff so I have decided to tackle this separately from this one. I filed https://issues.apache.org/jira/browse/KAFKA-15089 for this purpose.

For this patch, I have reduced GroupCoordinatorConfig to a simple POJO for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also wondering about how config defs worked with documentation. Thanks for filling this JIRA.

@dajac dajac marked this pull request as ready for review June 14, 2023 13:13
}

/**
* @return TODO
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: todo here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. Fixed.

topicPartitionFor(request.groupId()),
coordinator -> coordinator.consumerGroupHeartbeat(context, request)
).exceptionally(exception -> {
if (exception instanceof UnknownTopicOrPartitionException) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have we always converted these errors as such? I see in GroupMetadataManager things are slightly different.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We have similar logic in the scala code here. The main difference is that we have to do it at a different place now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw this code, but it seems like we handle it different right?

notenoughreplicas moved to not coordinator when it used to be coordinator not available for example.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems to be a mistake. I will check this tomorrow.

Iterable<TopicPartition> partitions,
TransactionResult transactionResult
) {
throwIfNotActive();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we plan to add more to these?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean? Different ones?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More to the methods

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now we just throw if not active. Just curious if this is all we plan to do here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we have to implement of these methods. We have JIRAs for all of them...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it. thanks for clarifying. maybe we don't need a todo or anything if we at least have the jira.
Some of the other methods had "not yet implemented" in the body so I wasn't sure about the ones that didn't.

* The number of partitions of the __consumer_offsets topics. This is provided
* when the component is started.
*/
private volatile int numPartitions = -1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to confirm this is usually just OffsetsTopicPartitionsProp, but we set to 1 in tests etc?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The __consumer_offsets is created based on OffsetsTopicPartitionsProp. However, numPartitions is gotten from the metadata cache when the coordinator is started. If the topic was manually created, it could have a different number of partitions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should only be done manually via the tests right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numPartitions is set in startup all the time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I mean the topic should only be manually created via tests. Usually it would be done automatically when we use consumers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.


service.startup(() -> 10);

assertTrue(service.partitionFor("foo") >= 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we just use the hashing algorithm to get the actual partition?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. Let me change this.

request
)).thenReturn(result);

assertEquals(result, coordinator.consumerGroupHeartbeat(context, request));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just a test that we don't throw errors?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it just validates that a successful response is just returned to the caller.

}

@Test
public void testOnElection() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did we want to include a test that confirms we throw errors when not active?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added an assertion for this in this test. I did the same for testOnResignation.

@jolshan
Copy link
Member

jolshan commented Jun 21, 2023

Lots of connect failures when trying to shut down the brokers, let's see if this new run is cleaner.

Copy link
Member

@jolshan jolshan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make sure the tests look ok before merging.

@dajac
Copy link
Member Author

dajac commented Jun 22, 2023

Failed tests are not related:

Build / JDK 17 and Scala 2.13 / testOffsetTranslationBehindReplicationFlow() – org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest
1m 50s
Build / JDK 8 and Scala 2.12 / testBumpTransactionalEpoch(String).quorum=kraft – kafka.api.TransactionsTest
1m 14s
Build / JDK 11 and Scala 2.13 / testBalancePartitionLeaders() – org.apache.kafka.controller.QuorumControllerTest

@dajac dajac merged commit a81486e into apache:trunk Jun 22, 2023
@dajac dajac deleted the KAFKA-14462-18 branch June 22, 2023 07:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
KIP-848 The Next Generation of the Consumer Rebalance Protocol
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants