From f75a05900860052622b2fa2d74bd223b8569fc3f Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Wed, 4 Oct 2023 20:37:57 -0700 Subject: [PATCH] ducktape: test for node wise recovery. --- tests/rptest/services/admin.py | 13 ++ .../partition_force_reconfiguration_test.py | 136 +++++++++++++++++- 2 files changed, 148 insertions(+), 1 deletion(-) diff --git a/tests/rptest/services/admin.py b/tests/rptest/services/admin.py index 6e2b465d97dd0..dca936d80f6cd 100644 --- a/tests/rptest/services/admin.py +++ b/tests/rptest/services/admin.py @@ -747,6 +747,19 @@ def force_abort_partition_move(self, path = f"partitions/{namespace}/{topic}/{partition}/unclean_abort_reconfiguration" return self._request('post', path, node=node) + def get_majority_lost_partitions_from_nodes(self, + defunct_brokers: list[int], + node=None): + assert defunct_brokers + brokers_csv = ','.join(str(b) for b in defunct_brokers) + path = f"partitions/majority_lost?defunct_nodes={brokers_csv}" + return self._request('get', path, node).json() + + def force_recover_partitions_from_nodes(self, payload: dict, node=None): + assert payload + path = "partitions/force_recover_from_nodes" + return self._request('post', path, node, json=payload) + def create_user(self, username, password, algorithm): self.redpanda.logger.debug( f"Creating user {username}:{password}:{algorithm}") diff --git a/tests/rptest/tests/partition_force_reconfiguration_test.py b/tests/rptest/tests/partition_force_reconfiguration_test.py index 59066f33fc94e..22ea5c41f2f76 100644 --- a/tests/rptest/tests/partition_force_reconfiguration_test.py +++ b/tests/rptest/tests/partition_force_reconfiguration_test.py @@ -6,6 +6,7 @@ # As of the Change Date specified in that file, in accordance with # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0 +import random import requests from rptest.services.cluster import cluster from rptest.clients.types import TopicSpec @@ -30,7 +31,7 @@ def __init__(self, test_context, *args, **kwargs): self).__init__(test_context, *args, **kwargs) SPEC = TopicSpec(name="topic", replication_factor=5) - WAIT_TIMEOUT_S = 30 + WAIT_TIMEOUT_S = 60 def _start_redpanda(self, acks=-1): self.start_redpanda( @@ -284,3 +285,136 @@ def get_lso(): self.await_num_produced(min_records=10000) self.start_consumer() self.run_validation() + + @cluster(num_nodes=5) + @matrix(defunct_node_count=[1, 2]) + def test_node_wise_recovery(self, defunct_node_count): + self.start_redpanda(num_nodes=5, + extra_rp_conf={ + "partition_autobalancing_mode": "continuous", + "enable_leader_balancer": False, + }) + num_topics = 20 + # Create a mix of rf=1 and 3 topics. + topics = [] + for i in range(0, num_topics): + rf = 3 if i % 2 == 0 else 1 + parts = random.randint(1, 3) + spec = TopicSpec(name=f"topic-{i}", + replication_factor=rf, + partition_count=parts) + topics.append(spec) + self.client().create_topic(spec) + + admin = self.redpanda._admin + + to_kill_nodes = random.sample(self.redpanda.started_nodes(), + defunct_node_count) + to_kill_node_ids = [ + int(self.redpanda.node_id(n)) for n in to_kill_nodes + ] + + partitions_lost_majority = admin.get_majority_lost_partitions_from_nodes( + defunct_brokers=to_kill_node_ids) + + self.logger.debug(f"Stopping nodes: {to_kill_node_ids}") + self.redpanda.for_nodes(to_kill_nodes, self.redpanda.stop_node) + + def controller_available(): + controller = self.redpanda.controller() + return controller is not None and self.redpanda.node_id( + controller) not in to_kill_node_ids + + wait_until(controller_available, + timeout_sec=self.WAIT_TIMEOUT_S, + backoff_sec=3, + err_msg="Controller not available") + + def make_recovery_payload(defunct_nodes: list[int], + partitions_lost_majority: dict): + return { + "defunct_nodes": defunct_nodes, + "partitions_to_force_recover": partitions_lost_majority + } + + payload = make_recovery_payload(to_kill_node_ids, + partitions_lost_majority) + self.logger.debug(f"payload: {payload}") + + surviving_node = random.choice([ + n for n in self.redpanda.started_nodes() + if self.redpanda.node_id(n) not in to_kill_node_ids + ]) + + # issue a node wise recovery + self.redpanda._admin.force_recover_partitions_from_nodes( + payload=make_recovery_payload(to_kill_node_ids, + partitions_lost_majority), + node=surviving_node) + + # Ensure the brokers are marked defunct. + def brokers_are_defunct(): + brokers = admin.get_brokers(node=surviving_node) + for broker in brokers: + expected_liveness_state = "defunct" if broker[ + "node_id"] in to_kill_node_ids else "functional" + assert "liveness_status" in broker.keys() + if expected_liveness_state != broker["liveness_status"]: + return False + return len(brokers) == len(self.redpanda.nodes) + + wait_until(brokers_are_defunct, + timeout_sec=self.WAIT_TIMEOUT_S, + backoff_sec=2, + err_msg="Brokers are not designated defunct") + + # Wait until there are no partition assignments with majority loss due to dead nodes. + wait_until(lambda: len( + admin.get_majority_lost_partitions_from_nodes( + defunct_brokers=to_kill_node_ids, node=surviving_node)) == 0, + timeout_sec=self.WAIT_TIMEOUT_S, + backoff_sec=3, + err_msg="Node wise recovery failed") + + def pending_force_reconfigurations(): + try: + return admin.get_partition_balancer_status( + )["partitions_pending_force_recovery_count"] + except: + return -1 + + wait_until(lambda: pending_force_reconfigurations() == 0, + timeout_sec=self.WAIT_TIMEOUT_S, + backoff_sec=2, + err_msg="reported force recovery count is non zero") + + # Ensure every partition has a stable leader. + for topic in topics: + for part in range(0, topic.partition_count): + self.redpanda._admin.await_stable_leader( + topic=topic.name, + partition=part, + timeout_s=self.WAIT_TIMEOUT_S, + backoff_s=2, + hosts=self._alive_nodes()) + + def nodes_have_no_replicas(nodes: list[int]): + """Returns True if there are no replicas of user ntps on input set of nodes.""" + node_set = set(nodes) + partitions = [] + for topic in topics: + p_list = self.redpanda._admin.get_partitions(topic=topic.name) + for partition in p_list: + replicas = set( + [r['node_id'] for r in partition["replicas"]]) + if any([r in node_set for r in replicas]): + partitions.append(partition) + self.logger.debug( + f"partition with replicas on: {nodes} list: {partitions}") + return len(partitions) == 0 + + # Ensure the partition balancer drains all replicas from defunct nodes. + wait_until(lambda: nodes_have_no_replicas(to_kill_node_ids), + timeout_sec=self.WAIT_TIMEOUT_S, + backoff_sec=3, + err_msg="Not all replicas are drained from defunct nodes")