Skip to content

Commit

Permalink
Merge pull request #11547 from mmaslankaprv/fix-11454
Browse files Browse the repository at this point in the history
Track progress of topic aware rebalancing
  • Loading branch information
mmaslankaprv authored Jun 21, 2023
2 parents afd9b81 + 12d2921 commit cfd356e
Showing 1 changed file with 46 additions and 11 deletions.
57 changes: 46 additions & 11 deletions tests/rptest/tests/leadership_transfer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,18 +139,40 @@ def all_partitions_present(nodes: int):

return True

def count_leaders_per_node(topic_name: str):
leaders_per_node = collections.defaultdict(int)
tps = self.redpanda.partitions(topic_name)
for p in tps:
if p.leader:
leaders_per_node[p.leader] += 1

return leaders_per_node

def distribution_error():
nodes = [
self.redpanda.node_id(n)
for n in self.redpanda.started_nodes()
]
error = 0.0
for t in self.topics:
leaders_per_node = count_leaders_per_node(topic_name=t.name)
opt_leaders = t.partition_count / len(nodes)

for n in nodes:
if n in leaders_per_node:
error += (opt_leaders - leaders_per_node[n])**2
else:
error += opt_leaders**2

return error

def has_leader_count(topic_name: str, min_per_node: int,
nodes: int) -> bool:
tps = self.redpanda.partitions(topic_name)
leaders = [p.leader for p in tps if p.leader]
leaders_per_node = count_leaders_per_node(topic_name)

if len(set(leaders)) < nodes:
if len(set(leaders_per_node)) < nodes:
return False

leaders_per_node = dict(zip(leaders, [0] * len(leaders)))
for l in leaders:
leaders_per_node[l] += 1

self.logger.info(
f"{topic_name} has dist {leaders_per_node.values()}")
return all(leader_cnt >= min_per_node
Expand Down Expand Up @@ -194,11 +216,24 @@ def topic_leadership_evenly_distributed():
start_timeout = 60
self.redpanda.start_node(node, timeout=start_timeout)

def wait_for_topics_evenly_distributed(improvement_deadline):
last_update = time.time()
last_error = distribution_error()
while (time.time() - last_update < improvement_deadline):
if topic_leadership_evenly_distributed():
return True
current_error = distribution_error()
self.logger.debug(
f"current distribution error: {current_error}, previous error: {last_error}, last improvement update: {last_update}"
)
if current_error < last_error:
last_update = time.time()
last_error = current_error

time.sleep(5)

self.logger.info("stabilization post start")
wait_until(lambda: topic_leadership_evenly_distributed(),
timeout_sec=300,
backoff_sec=10,
err_msg="Leadership did not stablize")
wait_for_topics_evenly_distributed(30)


class AutomaticLeadershipBalancingTest(RedpandaTest):
Expand Down

0 comments on commit cfd356e

Please sign in to comment.