diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py index bc35cab220f6..5e5c5ff308d6 100644 --- a/tests/kafkatest/tests/client/consumer_test.py +++ b/tests/kafkatest/tests/client/consumer_test.py @@ -84,7 +84,7 @@ def setup_consumer(self, topic, **kwargs): use_new_coordinator=[True], group_protocol=consumer_group.all_group_protocols ) - def test_broker_rolling_bounce(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_broker_rolling_bounce(self, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None): """ Verify correct consumer behavior when the brokers are consecutively restarted. @@ -143,7 +143,7 @@ def test_broker_rolling_bounce(self, metadata_quorum=quorum.zk, use_new_coordina use_new_coordinator=[True], group_protocol=consumer_group.all_group_protocols ) - def test_consumer_bounce(self, clean_shutdown, bounce_mode, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_consumer_bounce(self, clean_shutdown, bounce_mode, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None): """ Verify correct consumer behavior when the consumers in the group are consecutively restarted. @@ -202,7 +202,7 @@ def test_consumer_bounce(self, clean_shutdown, bounce_mode, metadata_quorum=quor use_new_coordinator=[True], group_protocol=[consumer_group.classic_group_protocol] ) - def test_static_consumer_bounce_with_eager_assignment(self, clean_shutdown, static_membership, bounce_mode, num_bounces, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_static_consumer_bounce_with_eager_assignment(self, clean_shutdown, static_membership, bounce_mode, num_bounces, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None): """ Verify correct static consumer behavior when the consumers in the group are restarted. In order to make sure the behavior of static members are different from dynamic ones, we take both static and dynamic @@ -275,7 +275,7 @@ def test_static_consumer_bounce_with_eager_assignment(self, clean_shutdown, stat use_new_coordinator=[True], group_protocol=consumer_group.all_group_protocols ) - def test_static_consumer_persisted_after_rejoin(self, bounce_mode, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_static_consumer_persisted_after_rejoin(self, bounce_mode, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None): """ Verify that the updated member.id(updated_member_id) caused by static member rejoin would be persisted. If not, after the brokers rolling bounce, the migrated group coordinator would load the stale persisted member.id and @@ -317,7 +317,7 @@ def test_static_consumer_persisted_after_rejoin(self, bounce_mode, metadata_quor use_new_coordinator=[True], group_protocol=consumer_group.all_group_protocols ) - def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None): """ Verify correct static consumer behavior when there are conflicting consumers with same group.instance.id. @@ -401,7 +401,7 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me use_new_coordinator=[True], group_protocol=consumer_group.all_group_protocols ) - def test_consumer_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_consumer_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None): partition = TopicPartition(self.TOPIC, 0) consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit, group_protocol=group_protocol) @@ -459,7 +459,7 @@ def test_consumer_failure(self, clean_shutdown, enable_autocommit, metadata_quor use_new_coordinator=[True], group_protocol=consumer_group.all_group_protocols ) - def test_broker_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_broker_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None): partition = TopicPartition(self.TOPIC, 0) consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit, group_protocol=group_protocol) @@ -505,7 +505,7 @@ def test_broker_failure(self, clean_shutdown, enable_autocommit, metadata_quorum use_new_coordinator=[True], group_protocol=consumer_group.all_group_protocols ) - def test_group_consumption(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_group_consumption(self, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None): """ Verifies correct group rebalance behavior as consumers are started and stopped. In particular, this test verifies that the partition is readable after every @@ -570,7 +570,7 @@ def __init__(self, test_context): "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"], metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[True], - group_protocol=[consumer_group.classic_group_protocol], + group_protocol=[consumer_group.classic_group_protocol] ) @matrix( metadata_quorum=[quorum.isolated_kraft], @@ -578,7 +578,7 @@ def __init__(self, test_context): group_protocol=[consumer_group.consumer_group_protocol], group_remote_assignor=consumer_group.all_remote_assignors ) - def test_valid_assignment(self, assignment_strategy=None, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None, group_remote_assignor=None): + def test_valid_assignment(self, assignment_strategy=None, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None, group_remote_assignor=None): """ Verify assignment strategy correctness: each partition is assigned to exactly one consumer instance. diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java index 825e5ed2c6ec..0da56f1340b7 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java @@ -634,15 +634,15 @@ public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[] } } - String groupProtocol = res.getString("groupProtocol"); + GroupProtocol groupProtocol = GroupProtocol.of(res.getString("groupProtocol")); + consumerProps.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name()); // 3.7.0 includes support for KIP-848 which introduced a new implementation of the consumer group protocol. // The two implementations use slightly different configuration, hence these arguments are conditional. // // See the Python class/method VerifiableConsumer.start_cmd() in verifiable_consumer.py for how the // command line arguments are passed in by the system test framework. - if (groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name())) { - consumerProps.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol); + if (groupProtocol == GroupProtocol.CONSUMER) { String groupRemoteAssignor = res.getString("groupRemoteAssignor"); if (groupRemoteAssignor != null)