Skip to content

Commit

Permalink
tests: check for coordinator load in progress
Browse files Browse the repository at this point in the history
Signed-off-by: NyaliaLui <[email protected]>
  • Loading branch information
NyaliaLui committed Aug 3, 2023
1 parent 32a4e84 commit 5b5c542
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 3 deletions.
23 changes: 21 additions & 2 deletions tests/rptest/tests/consumer_group_recovery_tool_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from rptest.services.rpk_producer import RpkProducer
from rptest.tests.prealloc_nodes import PreallocNodesTest
from rptest.tests.redpanda_test import RedpandaTest
from rptest.util import wait_until_result
from ducktape.utils.util import wait_until
from ducktape.mark import parametrize

Expand All @@ -31,7 +32,6 @@ def __init__(self, test_ctx, *args, **kwargs):
test_ctx,
num_brokers=3,
*args,
# disable leader balancer to make sure that group will not be realoaded because of leadership changes
extra_rp_conf={
# clear topics from the the kafka_nodelete_topics to allow for
# __consumer_offsets to be configured in this test.
Expand All @@ -44,7 +44,26 @@ def __init__(self, test_ctx, *args, **kwargs):
def describe_all_groups(self):
rpk = RpkTool(self.redpanda)
all_groups = {}
for g in rpk.group_list():

def do_list_groups():
try:
res = rpk.group_list()
assert res is not None
return (True, res)
except RpkException as ex:
self.logger.debug(ex)
if "COORDINATOR_LOAD_IN_PROGRESS" in str(ex):
# Retry the ListGroups req
return False
raise ex

group_list_res = wait_until_result(
do_list_groups,
timeout_sec=30,
backoff_sec=0.5,
err_msg="RPK failed to list consumer groups")

for g in group_list_res:
gd = rpk.group_describe(g)
all_groups[gd.name] = {}
for p in gd.partitions:
Expand Down
26 changes: 25 additions & 1 deletion tools/consumer_offsets_recovery/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,40 @@
from kafka.admin.new_topic import NewTopic
from kafka import TopicPartition
from kafka import OffsetAndMetadata
from kafka.errors import GroupLoadInProgressError
from jproperties import Properties
import logging
from ducktape.utils.util import wait_until

logger = logging.getLogger('cg-recovery-tool')


def read_offsets(admin: KafkaAdminClient):

groups_dict = {}
groups = admin.list_consumer_groups()
groups = None

def do_list_consumer_groups():
# nonlocal informs python that the variable is defined outside
# the current scope
nonlocal groups
try:
groups = admin.list_consumer_groups()
assert groups is not None
return True
except GroupLoadInProgressError as ex:
# GroupLoadInProgressError represents COORDINATOR_LOAD_IN_PROGRESS
# retry on this error
logger.debug(ex)
return False

wait_until(
do_list_consumer_groups,
timeout_sec=10,
backoff_sec=0.5,
err_msg="Consumer offsets recovery tool failed to list consumer groups"
)

for g, _ in groups:
logger.info(f"reading group '{g}' offsets")
offsets = admin.list_consumer_group_offsets(group_id=g)
Expand Down

0 comments on commit 5b5c542

Please sign in to comment.