Skip to content

Commit

Permalink
Merge pull request redpanda-data#21472 from mmaslankaprv/static-membr…
Browse files Browse the repository at this point in the history
…ship-test

tests: added test verifying group rebalance with static assignment
  • Loading branch information
mmaslankaprv authored Jul 29, 2024
2 parents 711fb12 + 741e135 commit 3cef311
Showing 1 changed file with 327 additions and 1 deletion.
328 changes: 327 additions & 1 deletion tests/rptest/tests/consumer_group_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,19 @@
# by the Apache License, Version 2.0

from dataclasses import dataclass
import random
import threading
import time
from typing import Dict, List

from rptest.clients.default import DefaultClient
from rptest.clients.offline_log_viewer import OfflineLogViewer
from rptest.services.cluster import cluster

from rptest.clients.rpk import RpkException, RpkTool
from rptest.clients.types import TopicSpec
from rptest.services.kafka_cli_consumer import KafkaCliConsumer
from rptest.services.kgo_verifier_services import KgoVerifierProducer
from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST, RedpandaService
from rptest.services.rpk_producer import RpkProducer
from rptest.services.verifiable_consumer import VerifiableConsumer
Expand All @@ -25,13 +30,16 @@
from rptest.utils.mode_checks import skip_debug_mode

from ducktape.utils.util import wait_until
from ducktape.mark import parametrize
from ducktape.mark import parametrize, ok_to_fail
from kafka import KafkaConsumer, TopicPartition
from kafka import errors as kerr
from kafka.admin import KafkaAdminClient
from kafka.protocol.commit import OffsetFetchRequest_v3
from kafka.protocol.api import Request, Response
import kafka.protocol.types as types
from confluent_kafka.admin import AdminClient
from confluent_kafka import ConsumerGroupState
from confluent_kafka import Consumer


class ConsumerGroupTest(RedpandaTest):
Expand Down Expand Up @@ -639,3 +647,321 @@ class OffsetFetchRequest_v5(Request):
API_VERSION = 5
RESPONSE_TYPE = OffsetFetchResponse_v5
SCHEMA = OffsetFetchRequest_v3.SCHEMA


class TestConsumer:
def __init__(self, bootstrap_servers, group, topic, id, logger):
self.bootstrap_servers = bootstrap_servers
self.id = id
self.group = group
self.topic = topic
self.consumer_thread = threading.Thread(
name=f'consumer-{id}',
target=lambda this: this.loop(),
args=[self])
self.stopped = threading.Event()
self.restart = threading.Event()
self.logger = logger
self.consumer_thread.daemon = True
self.consumer_thread.start()
self.last_consumed = None
self.lock = threading.Lock()
self.restarted = threading.Event()

def stop(self):
self.logger.info(f"stopping consumer with id: {self.id}")
self.stopped.set()
self.consumer_thread.join()

def loop(self):
self.consumer = Consumer({
"group.id": self.group,
"group.instance.id": f"consumer-{self.id}",
'bootstrap.servers': self.bootstrap_servers,
"session.timeout.ms": 10000,
'auto.offset.reset': 'earliest',
'enable.auto.offset.store': False,
})
self.consumer.subscribe([self.topic])
self.logger.info(f"starting consumer with id: {self.id}")
while not self.stopped.is_set():
if self.restart.is_set():
self.logger.info(f"restarting consumer with id: {self.id}")
self.consumer.close()
self.consumer = Consumer({
"group.id": self.group,
"group.instance.id": f"consumer-{self.id}",
'bootstrap.servers': self.bootstrap_servers,
"session.timeout.ms": 10000,
'auto.offset.reset': 'earliest',
'enable.auto.offset.store': False,
})
self.consumer.subscribe([self.topic])
self.consumer.poll(0.5)
self.restart.clear()
self.restarted.set()

try:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
self.logger.error(
f"consumer {self.id} error - {msg.error()}")
continue

self.consumer.store_offsets(msg)
with self.lock:
self.last_consumed = msg.offset()

except Exception as e:
self.logger.error(f"consumer {self.id} error - {e}")

self.consumer.close()

def get_last_consumed(self):
with self.lock:
return self.last_consumed

def restart_consumer(self):
self.restart.set()
self.restarted.wait()
self.restarted.clear()


class TestConsumer:
def __init__(self,
bootstrap_servers,
group,
topic,
id,
logger,
session_timeout_ms=10000):
self.bootstrap_servers = bootstrap_servers
self.id = id
self.group = group
self.topic = topic
self.consumer_thread = threading.Thread(
name=f'consumer-{id}',
target=lambda this: this.loop(),
args=[self])
self.stopped = threading.Event()
self.restart = threading.Event()
self.logger = logger
self.consumer_thread.daemon = True
self.last_consumed = -1
self.lock = threading.Lock()
self.restarted = threading.Event()
self.session_timeout_ms = session_timeout_ms
self.consumer_thread.start()

def stop(self):
self.logger.info(f"stopping consumer with id: {self.id}")
self.stopped.set()
self.consumer_thread.join()

def is_stopped(self):
return self.stopped.is_set()

def create_consumer_client(self):
self.consumer = Consumer(
{
"group.id": self.group,
"group.instance.id": f"consumer-{self.id}",
'bootstrap.servers': self.bootstrap_servers,
"session.timeout.ms": self.session_timeout_ms,
'auto.offset.reset': 'earliest',
'enable.auto.offset.store': True,
'enable.auto.commit': False,
'log_level': 7,
'debug': 'cgrp',
},
logger=self.logger)
self.consumer.subscribe([self.topic])

def loop(self):
self.create_consumer_client()
self.logger.info(f"starting consumer with id: {self.id}")
while not self.stopped.is_set():
if self.restart.is_set():
self.logger.info(f"restarting consumer with id: {self.id}")
self.consumer.close()
self.create_consumer_client()
self.consumer.poll(0.5)
self.restart.clear()
self.restarted.set()

try:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
self.logger.error(
f"consumer {self.id} error - {msg.error()}")
continue

with self.lock:
self.last_consumed = msg.offset()

except Exception as e:
self.logger.error(f"consumer {self.id} error - {e}")
self.logger.info(f"closing consumer with id: {self.id}")
self.consumer.close()

def get_last_consumed(self):
with self.lock:
return self.last_consumed

def restart_consumer(self):
self.restart.set()
self.restarted.wait()
self.restarted.clear()


class ConsumerGroupStaticMembersRebalance(RedpandaTest):
def __init__(self, test_context):
super(ConsumerGroupStaticMembersRebalance,
self).__init__(test_context=test_context, num_brokers=3)
self.installer = self.redpanda._installer

def get_group_description(self):
description = self.admin_client.describe_consumer_groups(
group_ids=[self.group_id])[self.group_id].result()
return description

@cluster(num_nodes=4)
@skip_debug_mode
def verify_consumer_group_state_after_action(
self,
disturbance_action,
post_rebalance_check,
consumer_session_timeout=10000):
self.consumer_count = 120
topic = TopicSpec(name="test-topic-1",
partition_count=self.consumer_count)
DefaultClient(self.redpanda).create_topic(topic)
self.group_id = "test-group-1"

producer = KgoVerifierProducer(self.test_context,
self.redpanda,
topic.name,
msg_size=128,
msg_count=5000000)
producer.start()
self.consumers: list[TestConsumer] = []

for c_id in range(self.consumer_count):
self.consumers.append(
TestConsumer(bootstrap_servers=self.redpanda.brokers(),
group=self.group_id,
topic=topic.name,
id=c_id,
logger=self.logger,
session_timeout_ms=consumer_session_timeout))

self.admin_client = AdminClient(
{"bootstrap.servers": self.redpanda.brokers()})

def consumers_made_progress():
return all(c.get_last_consumed() >= 0 for c in self.consumers)

def snapshot_consumers_state():
return {c.id: c.get_last_consumed() for c in self.consumers}

wait_until(consumers_made_progress, 60, 1)
progress_snapshot = snapshot_consumers_state()

state_before = self.get_group_description()
self.logger.info("group state before restart: %s", state_before.state)
assert state_before.state == ConsumerGroupState.STABLE
assert len(state_before.members) == self.consumer_count

disturbance_action()

def group_is_in_stable_state():
gr = self.get_group_description()
return gr.state == ConsumerGroupState.STABLE

wait_until(group_is_in_stable_state,
60,
0.2,
retry_on_exc=True,
err_msg="Timeout waiting on group to reach stable state")
self.logger.info("group rebalanced, waiting for progress")

def all_consumers_made_progress():
return all(c.get_last_consumed() > progress_snapshot[c.id]
for c in self.consumers if not c.is_stopped())

wait_until(
all_consumers_made_progress,
60,
0.5,
err_msg="Timeout waiting for all consumers to make progress")

assert post_rebalance_check(), "post rebalance check failed"

producer.stop()
for c in self.consumers:
c.stop()

@cluster(num_nodes=4)
@skip_debug_mode
def test_static_member_rejoining_group(self):
def restart_then_stop_consumer():
consumer_to_restart = random.choice(self.consumers)
last_consumed = consumer_to_restart.get_last_consumed()
consumer_to_restart.restart_consumer()
after_restart = self.get_group_description()
self.logger.info("group state after restart: %s",
after_restart.state)
assert after_restart.state == ConsumerGroupState.STABLE
assert len(after_restart.members) == self.consumer_count

wait_until(
lambda: consumer_to_restart.get_last_consumed() >
last_consumed, 60, 1)
# now stop consumer, after a timeout group should rebalance and the member should be removed
consumer_to_restart.stop()

def group_started_rebalance():
gr = self.get_group_description()
return gr.state == ConsumerGroupState.PREPARING_REBALANCING

wait_until(group_started_rebalance, 60, 0.2, retry_on_exc=True)

def verify_consumer_is_missing():
gr = self.get_group_description()
self.logger.info("post test group state: %s, members count: %s",
gr.state, len(gr.members))
return len(gr.members) == self.consumer_count - 1

self.verify_consumer_group_state_after_action(
restart_then_stop_consumer,
verify_consumer_is_missing,
consumer_session_timeout=10000)

#this test fails as the consumer are fenced when Redpanda is
@cluster(num_nodes=4)
@skip_debug_mode
@ok_to_fail
def test_force_kill_all_redpanda_nodes(self):
def restart_then_stop_consumer():
self.logger.info("stopping redpanda")
for n in self.redpanda.nodes:
self.redpanda.stop_node(n)
time.sleep(10)
self.logger.info("starting redpanda")
for n in self.redpanda.nodes:
self.redpanda.start_node(n)

def verify_all_consumers_are_present():
gr = self.get_group_description()
self.logger.info("post test group state: %s, members count: %d",
gr.state, len(gr.members))
return len(gr.members) == self.consumer_count

self.verify_consumer_group_state_after_action(
restart_then_stop_consumer,
verify_all_consumers_are_present,
consumer_session_timeout=10000)

0 comments on commit 3cef311

Please sign in to comment.