Skip to content

Commit

Permalink
Merge pull request #13661 from bharathv/all_replicas_gone
Browse files Browse the repository at this point in the history
controller: force reconfiguration from all replicas gone scenario
  • Loading branch information
bharathv authored Sep 28, 2023
2 parents 7fcbe01 + ee72146 commit d6f608a
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 10 deletions.
3 changes: 2 additions & 1 deletion src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1229,7 +1229,8 @@ controller_backend::process_partition_reconfiguration(
* created with cancel/abort type of deltas.
*/
vassert(
type == topic_table_delta::op_type::update,
type == topic_table_delta::op_type::update
|| type == topic_table_delta::op_type::force_update,
"Invalid reconciliation loop state. Partition replicas should not be "
"removed before finishing update, ntp: {}, current operation: {}, "
"target_assignment: {}",
Expand Down
2 changes: 2 additions & 0 deletions src/v/features/feature_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ std::string_view to_string_view(feature f) {
return "raft_coordinated_recovery";
case feature::cloud_storage_scrubbing:
return "cloud_storage_scrubbing";
case feature::enhanced_force_reconfiguration:
return "enhanced_force_reconfiguration";

/*
* testing features
Expand Down
7 changes: 7 additions & 0 deletions src/v/features/feature_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ enum class feature : std::uint64_t {
lightweight_heartbeats = 1ULL << 30U,
raft_coordinated_recovery = 1ULL << 31U,
cloud_storage_scrubbing = 1ULL << 32U,
enhanced_force_reconfiguration = 1ULL << 33U,

// Dummy features for testing only
test_alpha = 1ULL << 61U,
Expand Down Expand Up @@ -299,6 +300,12 @@ constexpr static std::array feature_schema{
"cloud_storage_scrubbing",
feature::cloud_storage_scrubbing,
feature_spec::available_policy::always,
feature_spec::prepare_policy::always},
feature_spec{
cluster::cluster_version{11},
"enhanced_force_reconfiguration",
feature::enhanced_force_reconfiguration,
feature_spec::available_policy::always,
feature_spec::prepare_policy::always}};

std::string_view to_string_view(feature);
Expand Down
8 changes: 6 additions & 2 deletions src/v/redpanda/admin_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3010,8 +3010,12 @@ admin_server::force_set_partition_replicas_handler(
replicas);
co_return ss::json::json_void();
}

if (!cluster::is_proper_subset(replicas, current_replicas)) {
auto relax_restrictions
= _controller->get_feature_table().local().is_active(
features::feature::enhanced_force_reconfiguration);
if (
!relax_restrictions
&& !cluster::is_proper_subset(replicas, current_replicas)) {
throw ss::httpd::bad_request_exception(fmt::format(
"Target assignment {} is not a proper subset of current {}, "
"choose a proper subset of existing replicas.",
Expand Down
89 changes: 82 additions & 7 deletions tests/rptest/tests/partition_force_reconfiguration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from random import shuffle
import time
from rptest.tests.partition_movement import PartitionMovementMixin
from rptest.services.admin import Replica
from rptest.clients.kcl import KCL


class PartitionForceReconfigurationTest(EndToEndTest, PartitionMovementMixin):
Expand Down Expand Up @@ -43,14 +45,17 @@ def _start_redpanda(self, acks=-1):
self.await_num_produced(min_records=10000)

def _wait_until_no_leader(self):
ntp = f"kafka/{self.topic}/0"

"""Scrapes the debug endpoints of all replicas and checks if any of the replicas think they are the leader"""
def no_leader():
hov = self.redpanda._admin.get_cluster_health_overview()
leaderless_parts = hov['leaderless_partitions']
self.redpanda.logger.debug(
f"Leaderless partitions: {leaderless_parts}")
return ntp in leaderless_parts
state = self.redpanda._admin.get_partition_state(
"kafka", self.topic, 0)
if "replicas" not in state.keys() or len(state["replicas"]) == 0:
return True
for r in state["replicas"]:
assert "raft_state" in r.keys()
if r["raft_state"]["is_leader"]:
return False
return True

wait_until(no_leader,
timeout_sec=30,
Expand Down Expand Up @@ -204,3 +209,73 @@ def test_reconfiguring_with_dead_node(self, controller_snapshots):
self.redpanda._admin.await_stable_leader(topic=self.topic,
replication=len(alive) + 1,
hosts=self._alive_nodes())

@cluster(num_nodes=7)
@matrix(target_replica_set_size=[1, 3])
def test_reconfiguring_all_replicas_lost(self, target_replica_set_size):
self.start_redpanda(num_nodes=4)
assert self.redpanda

# create a topic with rf = 1
self.topic = "topic"
self.client().create_topic(
TopicSpec(name=self.topic, replication_factor=1))

kcl = KCL(self.redpanda)

# produce some data.
self.start_producer(acks=1)
self.await_num_produced(min_records=10000)
self.producer.stop()

def get_stable_lso():
def get_lso():
try:
partitions = kcl.list_offsets([self.topic])
if len(partitions) == 0:
return -1
return partitions[0].end_offset
except:
return -1

wait_until(
lambda: get_lso() != -1,
timeout_sec=30,
backoff_sec=1,
err_msg=
f"Partition {self.topic}/0 couldn't achieve a stable lso")

return get_lso()

lso = get_stable_lso()
assert lso >= 10001, f"Partition {self.topic}/0 has incorrect lso {lso}"

# kill the broker hosting the replica
(killed, alive) = self._stop_majority_nodes(replication=1)
assert len(killed) == 1
assert len(alive) == 0

self._wait_until_no_leader()

# force reconfigure to target replica set size
assert target_replica_set_size <= len(self._alive_nodes())
new_replicas = [
Replica(dict(node_id=self.redpanda.node_id(replica), core=0)) for
replica in self.redpanda.started_nodes()[:target_replica_set_size]
]
self._force_reconfiguration(new_replicas=new_replicas)

self.redpanda._admin.await_stable_leader(
topic=self.topic,
replication=target_replica_set_size,
hosts=self._alive_nodes())

# Ensure it is empty
lso = get_stable_lso()
assert lso == 0, f"Partition {self.topic}/0 has incorrect lso {lso}"

# check if we can produce/consume with new replicas from a client perspective
self.start_producer()
self.await_num_produced(min_records=10000)
self.start_consumer()
self.run_validation()

0 comments on commit d6f608a

Please sign in to comment.