Skip to content

Commit

Permalink
make system more robust
Browse files Browse the repository at this point in the history
  • Loading branch information
rystsov committed Sep 27, 2023
1 parent c44ee37 commit 0cb37cc
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 46 deletions.
89 changes: 43 additions & 46 deletions src/v/cluster/drain_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,38 +44,16 @@ ss::future<> drain_manager::stop() {
// multiple times?
ss::future<> drain_manager::drain() {
return container().invoke_on_all([](cluster::drain_manager& self) {
if (self._abort.abort_requested()) {
// handle http requests racing with shutdown
return ss::now();
}

if (self._draining) {
vlog(clusterlog.info, "Node draining is already active");
return ss::now();
}

vlog(clusterlog.info, "Node draining is starting");
self._draining = true;
self._status = drain_status{};
self._requests.push(operation_type::drain);
self._sem.signal();
return ss::now();
});
}

ss::future<> drain_manager::restore() {
return container().invoke_on_all([](cluster::drain_manager& self) {
if (self._abort.abort_requested()) {
return ss::now();
}

if (!self._draining) {
vlog(clusterlog.info, "Node draining is not active");
return ss::now();
}

vlog(clusterlog.info, "Node draining is stopping");
// shouldn't we reset _status?! otherwise it's already finished
self._draining = false;
self._requests.push(operation_type::restore);
self._pending_restore += 1;
self._sem.signal();
return ss::now();
});
Expand All @@ -89,7 +67,7 @@ ss::future<std::optional<drain_manager::drain_status>> drain_manager::status() {

co_return co_await container().map_reduce0(
[](drain_manager& dm) -> std::optional<drain_status> {
if (dm._draining) {
if (dm._draining || dm._drained) {
return dm._status;
}
return std::nullopt;
Expand Down Expand Up @@ -131,30 +109,48 @@ ss::future<std::optional<drain_manager::drain_status>> drain_manager::status() {
ss::future<> drain_manager::task() {
while (true) {
co_await _sem.wait();
// -why to consume?! wait already substructs
_sem.consume(_sem.available_units());

if (_abort.abort_requested()) {
break;
}
if (_requests.empty()) {
continue;
}
auto op = _requests.front();
_requests.pop();
if (op == operation_type::drain) {
if (_drained) {
continue;
}
_draining = true;
}
if (op == operation_type::restore) {
_pending_restore -= 1;
_drained = false;
_draining = false;
}

const auto draining = _draining;
try {
if (draining) {
co_await do_drain();
} else {
co_await do_restore();
while (true) {
try {
switch (op) {
case operation_type::drain:
co_await do_drain();
break;
case operation_type::restore:
co_await do_restore();
break;
}
} catch (...) {
vlog(
clusterlog.warn,
"Draining task {{{}}} experienced error: {}",
op == operation_type::drain ? "drain" : "restore",
std::current_exception());
_status.errors = true;
continue;
}
} catch (...) {
// should errors trigger another attempt instead of false
// marking the draining as done?
vlog(
clusterlog.warn,
"Draining task {{{}}} experienced error: {}",
draining ? "drain" : "restore",
std::current_exception());
_status.errors = true;
break;
}

_status.finished = true;
}
}
Expand All @@ -170,7 +166,7 @@ ss::future<> drain_manager::do_drain() {
*/
_partition_manager.local().block_new_leadership();

while (_draining && !_abort.abort_requested()) {
while (_pending_restore == 0 && !_abort.abort_requested()) {
/*
* build a set of eligible partitions. ignore any raft groups that
* will fail when transferring leadership and which shouldn't be
Expand All @@ -190,6 +186,7 @@ ss::future<> drain_manager::do_drain() {
_status.partitions = _partition_manager.local().partitions().size();

if (eligible.empty()) {
_drained = true;
vlog(
clusterlog.info,
"Node draining has completed on shard {}",
Expand Down Expand Up @@ -269,7 +266,7 @@ ss::future<> drain_manager::do_drain() {
* to avoid spinning, cool off if we failed fast
*/
auto dur = ss::lowres_clock::now() - started;
if (failed > 0 && dur < transfer_throttle && _draining) {
if (failed > 0 && dur < transfer_throttle && _pending_restore == 0) {
try {
co_await ss::sleep_abortable(transfer_throttle - dur, _abort);
} catch (ss::sleep_aborted&) {
Expand Down
6 changes: 6 additions & 0 deletions src/v/cluster/drain_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <seastar/util/log.hh>

#include <chrono>
#include <queue>

namespace cluster {

Expand Down Expand Up @@ -92,6 +93,8 @@ class drain_manager : public ss::peering_sharded_service<drain_manager> {
ss::future<std::optional<drain_status>> status();

private:
enum class operation_type : int32_t { drain = 0, restore = 1 };

ss::future<> task();
ss::future<> do_drain();
ss::future<> do_restore();
Expand All @@ -102,6 +105,9 @@ class drain_manager : public ss::peering_sharded_service<drain_manager> {
ssx::semaphore _sem{0, "c/drain-mgr"};
drain_status _status;
ss::abort_source _abort;
std::queue<operation_type> _requests;
int32_t _pending_restore{0};
bool _drained{false};
};

} // namespace cluster
Expand Down

0 comments on commit 0cb37cc

Please sign in to comment.