Skip to content

Commit

Permalink
KAFKA-17915: Convert Kafka Client system tests to use KRaft (#17669)
Browse files Browse the repository at this point in the history
Reviewers: Lianet Magrans <[email protected]>, Chia-Ping Tsai <[email protected]>
  • Loading branch information
kirktrue authored Jan 14, 2025
1 parent d7e5d0a commit 45e3c21
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 13 deletions.
20 changes: 10 additions & 10 deletions tests/kafkatest/tests/client/consumer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def setup_consumer(self, topic, **kwargs):
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
def test_broker_rolling_bounce(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
def test_broker_rolling_bounce(self, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None):
"""
Verify correct consumer behavior when the brokers are consecutively restarted.
Expand Down Expand Up @@ -143,7 +143,7 @@ def test_broker_rolling_bounce(self, metadata_quorum=quorum.zk, use_new_coordina
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
def test_consumer_bounce(self, clean_shutdown, bounce_mode, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
def test_consumer_bounce(self, clean_shutdown, bounce_mode, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None):
"""
Verify correct consumer behavior when the consumers in the group are consecutively restarted.
Expand Down Expand Up @@ -202,7 +202,7 @@ def test_consumer_bounce(self, clean_shutdown, bounce_mode, metadata_quorum=quor
use_new_coordinator=[True],
group_protocol=[consumer_group.classic_group_protocol]
)
def test_static_consumer_bounce_with_eager_assignment(self, clean_shutdown, static_membership, bounce_mode, num_bounces, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
def test_static_consumer_bounce_with_eager_assignment(self, clean_shutdown, static_membership, bounce_mode, num_bounces, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None):
"""
Verify correct static consumer behavior when the consumers in the group are restarted. In order to make
sure the behavior of static members are different from dynamic ones, we take both static and dynamic
Expand Down Expand Up @@ -275,7 +275,7 @@ def test_static_consumer_bounce_with_eager_assignment(self, clean_shutdown, stat
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
def test_static_consumer_persisted_after_rejoin(self, bounce_mode, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
def test_static_consumer_persisted_after_rejoin(self, bounce_mode, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None):
"""
Verify that the updated member.id(updated_member_id) caused by static member rejoin would be persisted. If not,
after the brokers rolling bounce, the migrated group coordinator would load the stale persisted member.id and
Expand Down Expand Up @@ -317,7 +317,7 @@ def test_static_consumer_persisted_after_rejoin(self, bounce_mode, metadata_quor
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None):
"""
Verify correct static consumer behavior when there are conflicting consumers with same group.instance.id.
Expand Down Expand Up @@ -401,7 +401,7 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
def test_consumer_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
def test_consumer_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None):
partition = TopicPartition(self.TOPIC, 0)

consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit, group_protocol=group_protocol)
Expand Down Expand Up @@ -459,7 +459,7 @@ def test_consumer_failure(self, clean_shutdown, enable_autocommit, metadata_quor
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
def test_broker_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
def test_broker_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None):
partition = TopicPartition(self.TOPIC, 0)

consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit, group_protocol=group_protocol)
Expand Down Expand Up @@ -505,7 +505,7 @@ def test_broker_failure(self, clean_shutdown, enable_autocommit, metadata_quorum
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
def test_group_consumption(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
def test_group_consumption(self, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None):
"""
Verifies correct group rebalance behavior as consumers are started and stopped.
In particular, this test verifies that the partition is readable after every
Expand Down Expand Up @@ -570,15 +570,15 @@ def __init__(self, test_context):
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor"],
metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True],
group_protocol=[consumer_group.classic_group_protocol],
group_protocol=[consumer_group.classic_group_protocol]
)
@matrix(
metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True],
group_protocol=[consumer_group.consumer_group_protocol],
group_remote_assignor=consumer_group.all_remote_assignors
)
def test_valid_assignment(self, assignment_strategy=None, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None, group_remote_assignor=None):
def test_valid_assignment(self, assignment_strategy=None, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None, group_remote_assignor=None):
"""
Verify assignment strategy correctness: each partition is assigned to exactly
one consumer instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,15 +634,15 @@ public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[]
}
}

String groupProtocol = res.getString("groupProtocol");
GroupProtocol groupProtocol = GroupProtocol.of(res.getString("groupProtocol"));
consumerProps.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());

// 3.7.0 includes support for KIP-848 which introduced a new implementation of the consumer group protocol.
// The two implementations use slightly different configuration, hence these arguments are conditional.
//
// See the Python class/method VerifiableConsumer.start_cmd() in verifiable_consumer.py for how the
// command line arguments are passed in by the system test framework.
if (groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name())) {
consumerProps.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol);
if (groupProtocol == GroupProtocol.CONSUMER) {
String groupRemoteAssignor = res.getString("groupRemoteAssignor");

if (groupRemoteAssignor != null)
Expand Down

0 comments on commit 45e3c21

Please sign in to comment.