Skip to content

Commit

Permalink
KAFKA-17497 Add e2e for zk migration with old controller (#17131)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
frankvicky authored and chia7712 committed Sep 10, 2024
1 parent b22e848 commit 2dc3ee0
Showing 1 changed file with 15 additions and 12 deletions.
27 changes: 15 additions & 12 deletions tests/kafkatest/tests/core/zookeeper_migration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@
import time

from ducktape.utils.util import wait_until
from ducktape.mark import parametrize
from ducktape.mark import parametrize, matrix
from ducktape.mark.resource import cluster
from ducktape.errors import TimeoutError

from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.kafka import KafkaService
from kafkatest.services.kafka import KafkaService, config_property
from kafkatest.services.kafka.config_property import CLUSTER_ID
from kafkatest.services.kafka.quorum import isolated_kraft, ServiceQuorumInfo, zk
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
from kafkatest.version import DEV_BRANCH, LATEST_3_4
from kafkatest.version import DEV_BRANCH, LATEST_3_4, LATEST_3_7, KafkaVersion


class TestMigration(ProduceConsumeValidateTest):
Expand All @@ -51,10 +51,10 @@ def wait_until_rejoin(self):
wait_until(lambda: len(self.kafka.isr_idx_list(self.topic, partition)) == self.replication_factor, timeout_sec=60,
backoff_sec=1, err_msg="Replicas did not rejoin the ISR in a reasonable amount of time")

def do_migration(self, roll_controller = False, downgrade_to_zk = False):
def do_migration(self, roll_controller=False, downgrade_to_zk=False, from_kafka_version=str(DEV_BRANCH)):
# Start up KRaft controller in migration mode
remote_quorum = partial(ServiceQuorumInfo, isolated_kraft)
controller = KafkaService(self.test_context, num_nodes=1, zk=self.zk, version=DEV_BRANCH,
controller = KafkaService(self.test_context, num_nodes=1, zk=self.zk, version=KafkaVersion(from_kafka_version),
allow_zk_with_kraft=True,
isolated_kafka=self.kafka,
server_prop_overrides=[["zookeeper.connect", self.zk.connect_setting()],
Expand Down Expand Up @@ -86,20 +86,23 @@ def do_migration(self, roll_controller = False, downgrade_to_zk = False):
controller.start_node(node)

@cluster(num_nodes=7)
@parametrize(roll_controller = True)
@parametrize(roll_controller = False)
def test_online_migration(self, roll_controller):
@matrix(roll_controller=[True, False], from_kafka_version=[str(DEV_BRANCH), str(LATEST_3_7)])
def test_online_migration(self, roll_controller, from_kafka_version):
zk_quorum = partial(ServiceQuorumInfo, zk)
self.zk = ZookeeperService(self.test_context, num_nodes=1, version=DEV_BRANCH)

server_prop_overrides = [["zookeeper.metadata.migration.enable", "false"]]

if from_kafka_version != str(DEV_BRANCH):
server_prop_overrides.append([config_property.INTER_BROKER_PROTOCOL_VERSION, from_kafka_version])

self.kafka = KafkaService(self.test_context,
num_nodes=3,
zk=self.zk,
version=DEV_BRANCH,
quorum_info_provider=zk_quorum,
allow_zk_with_kraft=True,
server_prop_overrides=[
["zookeeper.metadata.migration.enable", "false"],
])
server_prop_overrides=server_prop_overrides)
self.kafka.security_protocol = "PLAINTEXT"
self.kafka.interbroker_security_protocol = "PLAINTEXT"
self.zk.start()
Expand Down Expand Up @@ -128,7 +131,7 @@ def test_online_migration(self, roll_controller):
self.topic, consumer_timeout_ms=30000,
message_validator=is_int, version=DEV_BRANCH)

self.run_produce_consume_validate(core_test_action=partial(self.do_migration, roll_controller = roll_controller))
self.run_produce_consume_validate(core_test_action=partial(self.do_migration, roll_controller=roll_controller, from_kafka_version=from_kafka_version))
self.kafka.stop()

@cluster(num_nodes=7)
Expand Down

0 comments on commit 2dc3ee0

Please sign in to comment.