Skip to content

Commit

Permalink
tests/rpk: add tolerant mode to rpk group describe parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
ztlpn committed Oct 20, 2023
1 parent 5ba72ac commit 2c12207
Showing 1 changed file with 18 additions and 27 deletions.
45 changes: 18 additions & 27 deletions tests/rptest/clients/rpk.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class RpkGroupPartition(typing.NamedTuple):
instance_id: str
client_id: str
host: str
error: str


class RpkGroup(typing.NamedTuple):
Expand Down Expand Up @@ -591,33 +592,13 @@ def group_seek_to(self, group, to):
cmd = ["seek", group, "--to", to]
self._run_group(cmd)

def group_describe(self, group, summary=False):
def group_describe(self, group, summary=False, tolerant=False):
def parse_field(field_name, string):
pattern = re.compile(f" *{field_name} +(?P<value>.+)")
m = pattern.match(string)
assert m is not None, f"Field string '{string}' does not match the pattern"
return m['value']

def check_lines(lines):
for line in lines:
# UNKNOWN_TOPIC_OR_PARTITION: This server doesn't contain this partition or topic.
# We should wait until server will get information about it.
if line.find('UNKNOWN_TOPIC_OR_PARTITION') != -1:
return False

# Leadership movements are underway
if 'NOT_LEADER_FOR_PARTITION' in line:
return False

# Cluster not ready yet
if 'unknown broker' in line:
return False

if "missing from list offsets" in line:
return False

return True

def try_describe_group(group):
if summary:
cmd = ["describe", "-s", group]
Expand Down Expand Up @@ -646,9 +627,6 @@ def try_describe_group(group):

lines = out.splitlines()

if not check_lines(lines):
return None

group_name = parse_field("GROUP", lines[0])
coordinator = parse_field("COORDINATOR", lines[1])
state = parse_field("STATE", lines[2])
Expand Down Expand Up @@ -683,9 +661,21 @@ def try_describe_group(group):
for i in range(len(table.columns)))

# Check to see if info for the partition was queried during a change in leadership.
error = obj.get("ERROR", "")
if "NOT_LEADER_FOR_PARTITION" in error:
return None
error = obj.get("ERROR")
if not tolerant and error:
error_strs = [
# UNKNOWN_TOPIC_OR_PARTITION: This server doesn't contain this partition
# or topic. We should wait until server will get information about it.
"UNKNOWN_TOPIC_OR_PARTITION",
# Leadership movements are underway
"NOT_LEADER_FOR_PARTITION",
# Cluster not ready yet
"unknown broker",
# ListOffsets request (needed to calculate lag) errored or was incomplete
"missing from list offsets",
]
if any(e in error for e in error_strs):
return None

def maybe_parse_int(field):
# Account for negative numbers and '-' value
Expand All @@ -703,6 +693,7 @@ def maybe_parse_int(field):
instance_id=obj.get("INSTANCE-ID"),
client_id=obj["CLIENT-ID"],
host=obj["HOST"],
error=error,
)

partitions.append(partition)
Expand Down

0 comments on commit 2c12207

Please sign in to comment.