Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added consumer group migration test when group topic is not present #4547

Merged
merged 1 commit into from
May 4, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 78 additions & 1 deletion tests/rptest/tests/consumer_offsets_migration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from rptest.clients.types import TopicSpec
from rptest.services.failure_injector import FailureInjector, FailureSpec
from rptest.tests.end_to_end import EndToEndTest
from rptest.services.redpanda import CHAOS_LOG_ALLOW_LIST, RedpandaService, ResourceSettings
from rptest.services.redpanda import CHAOS_LOG_ALLOW_LIST, RESTART_LOG_ALLOW_LIST, RedpandaService, ResourceSettings
from rptest.clients.default import DefaultClient
from ducktape.utils.util import wait_until

Expand Down Expand Up @@ -139,3 +139,80 @@ def _consumer_offsets_present():
self.run_validation(min_records=100000,
producer_timeout_sec=300,
consumer_timeout_sec=180)

@cluster(num_nodes=5, log_allow_list=RESTART_LOG_ALLOW_LIST)
def test_cluster_is_available_during_upgrade_without_group_topic(self):
'''
Validates that cluster is available and healthy during
upgrade when `kafka_internal::group` topic is not present
'''

# set redpanda logical version to value without __consumer_offsets support
self.redpanda = RedpandaService(
self.test_context,
5,
extra_rp_conf={
"group_topic_partitions": 16,
"default_topic_replications": 3,
},
environment={"__REDPANDA_LOGICAL_VERSION": 1})

self.redpanda.start()
self._client = DefaultClient(self.redpanda)

spec = TopicSpec(partition_count=6, replication_factor=3)
self.client().create_topic(spec)
self.topic = spec.name

def cluster_is_stable():
admin = Admin(self.redpanda)
brokers = admin.get_brokers()
if len(brokers) < 3:
return False

for b in brokers:
self.logger.debug(f"broker: {b}")
if not (b['is_alive'] and 'disk_space' in b):
return False

return True

def node_stopped(node_id):
admin = Admin(self.redpanda)
brokers = admin.get_brokers()

for b in brokers:
self.logger.debug(f"broker: {b}")
if b['node_id'] == node_id:
return b['is_alive'] == False

return False

kcl = KCL(self.redpanda)

# check that consumer offsets topic is not present
topics = set(kcl.list_topics())

assert "__consumer_offsets" not in topics

# enable consumer offsets support
self.redpanda.set_environment({"__REDPANDA_LOGICAL_VERSION": 2})

def get_raft0_follower():
ctrl = self.redpanda.controller
node = random.choice(self.redpanda.nodes)
while self.redpanda.idx(node) == self.redpanda.idx(ctrl):
node = random.choice(self.redpanda.nodes)

return node

# restart node that is not controller
n = get_raft0_follower()
self.logger.info(f"restarting node {n.account.hostname}")
self.redpanda.stop_node(n, timeout=60)
# wait for leader balancer to start evening out leadership
wait_until(lambda: node_stopped(self.redpanda.idx(n)),
90,
backoff_sec=2)
self.redpanda.start_node(n)
wait_until(cluster_is_stable, 90, backoff_sec=2)