Skip to content

Commit

Permalink
[yugabyte#18744] DocDB: Add ability to recover from follower lag caus…
Browse files Browse the repository at this point in the history
…ed by stuck OutboundCall

Summary:
In production, we saw a few cases where follower lag was continuously increasing for few tablets. Also we noticed that the replica which was lagging was not removed from quorum by ConsensusQueue either. After capturing a dump of a node where leader was hosted, we were able to figure out that the performing_update_mutex was held for a long time for the affected Peer.

Peer acquires the performing_update_mutex when it is building a request to send to peer and keeps the mutex locked until it receives a response. In the captured dump, we noticed that the OutboundCall was in SENT state, but we were not able to confirm if the connection (on which it was sent) was active or not -- we did an analysis of OutboundCall references and we believe that the connection had been shut down, but we were not sure.

This change tries to detect the stuck OutboundCall in Peer:
* Whenever we try to send more data or heartbeat to the peer (in `SignalRequest`), we check if we can acquire the `performing_update_mutex`. If the mutex is already held, then we try to see if it has been more than the time specified by `FLAGS_stuck_peer_call_threshold_ms` + request timeout since the call start time.
* If the lock is held for more than timeout + `FLAGS_stuck_peer_call_threshold_ms` time duration, then we log additional details which can help identify the root cause of the issue (see below for examples).
* And when `FLAGS_force_recover_from_stuck_peer_call` is set to true, we try to mark the stuck call as failed. If the call object is not present, then we won't be able to recover from this situation.

Another change is that whenever a Connection encounters a write failure, instead of immediately destroying the connection, the operation queues a reactor task to ensure that all queued operations on the socket are executed in order. However, since the socket is closed, all of these operations in queue will encounter write failures, resulting in all of them scheduling a DestroyConnection task. After the first DestroyConnection task is executed, we will not be able to find this connection in the reactor-tracked connections, which will lead to a CHECK error. To prevent multiple DestroyConnection tasks for a single connection, we track whether the connection has already queued the task for its destruction.

Information logged when we detect this situation -
```
I0822 13:36:35.338526 4161232384 rpc_stub-test.cc:1153] OutboundCall (0x0000000107186018 -> RPC call yb.rpc_test.CalculatorService.Concat -> { remote: 127.0.0.1:58098 idx: 1 protocol: 0x0000000103fae8f0 -> tcp } , state=SENT.) tostring: RPC call yb.rpc_test.CalculatorService.Concat -> { remote: 127.0.0.1:58098 idx: 1 protocol: 0x0000000103fae8f0 -> tcp } , state=SENT., times (start, sent, callback, now): (3713549.841s, 3713549.842s, 0.000s, 3713549.947s), connection: 0x000000010708a658 -> Connection (0x000000010708a658) client 127.0.0.1:58100 => 127.0.0.1:58098
I0822 13:36:35.338694 1845915648 connection.cc:409] Connection (0x000000010708a658) client 127.0.0.1:58100 => 127.0.0.1:58098: LastActivityTime: 3713549.839s, ActiveCalls stats: { during shutdown: 0, current size: 0, earliest expiry: 0.000s, }, OutboundCall: { ptr: 0x0000000107186018, call id: 2, is active: 0 }, Shutdown status: Service unavailable (yb/rpc/reactor.cc:106): Shutdown connection (system error 58), Shutdown time: 3713549.940s, Queue attempts after shutdown: { calls: 0, responses: 0 }
```

1. OutboundCall timing details - it includes the time when call was created, when it was sent on network and when callback call was received.
2. Connection - we dump the connection state on which the call was sent. This state includes whether connection is active or not. If alive, then we will see the active calls count, and whether the current call is present in active calls or not. If connection is not alive, then you will see the time when connection was shutdown. We also log the number of active call present during shutdown and queue attempts after shutdown.

Using the above timing information and connection state, we can determine the order of events. For ex, in the above sample logs, as you can see the connection was closed after the data was already sent on the socket.

Example trace for stuck call detection in Peer -
```
I0817 17:42:07.975589 4161232384 consensus_peers.cc:179] T test-peers-tablet P peer-0 -> Peer peer-1 ([], []): Found a RPC call in stuck state - is_finished: 0, timeout: 0.005s, last_update_lock_release_time_: 3296281.832s, stuck threshold: 0.001s, force recover: 0, call state: call not set
```
Jira: DB-7637

Test Plan:
./yb_build.sh --sj
./build/latest/tests-rpc/rpc_stub-test --gtest_filter=*StuckOutboundCall*

Reviewers: mbautin, sergei

Reviewed By: mbautin

Subscribers: amitanand, yql, yyan, rthallam, ybase, bogdan

Differential Revision: https://phorge.dev.yugabyte.com/D27859
  • Loading branch information
karan-yb committed Aug 23, 2023
1 parent b7d8bed commit 98586ef
Show file tree
Hide file tree
Showing 19 changed files with 307 additions and 48 deletions.
3 changes: 3 additions & 0 deletions src/yb/consensus/consensus_peers-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "yb/util/metrics.h"
#include "yb/util/opid.h"
#include "yb/util/scope_exit.h"
#include "yb/util/source_location.h"
#include "yb/util/status_log.h"
#include "yb/util/test_macros.h"
#include "yb/util/test_util.h"
Expand All @@ -58,6 +59,8 @@
using namespace std::chrono_literals;

METRIC_DECLARE_entity(tablet);
DECLARE_int32(stuck_peer_call_threshold_ms);
DECLARE_bool(force_recover_from_stuck_peer_call);

namespace yb {
namespace consensus {
Expand Down
65 changes: 52 additions & 13 deletions src/yb/consensus/consensus_peers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ DEFINE_test_flag(int32, delay_removing_peer_with_failed_tablet_secs, 0,
"indicating that a tablet is in the FAILED state, and before marking this peer "
"as failed.");

DEFINE_RUNTIME_int32(consensus_stuck_peer_call_threshold_ms, 10000,
"Time to wait after timeout before considering a RPC call as stuck.");
TAG_FLAG(consensus_stuck_peer_call_threshold_ms, advanced);

DEFINE_RUNTIME_bool(consensus_force_recover_from_stuck_peer_call, false,
"Set this flag to true in addition to consensus_stuck_peer_call_threshold_ms to automatically "
"recover from stuck RPC call.");
TAG_FLAG(consensus_force_recover_from_stuck_peer_call, advanced);

// Allow for disabling remote bootstrap in unit tests where we want to test
// certain scenarios without triggering bootstrap of a remote peer.
DEFINE_test_flag(bool, enable_remote_bootstrap, true,
Expand Down Expand Up @@ -158,6 +167,30 @@ Status Peer::SignalRequest(RequestTriggerMode trigger_mode) {
// If there are new requests in the queue we'll get them on ProcessResponse().
auto performing_update_lock = LockPerformingUpdate(std::try_to_lock);
if (!performing_update_lock.owns_lock()) {
// In production, we have seen some cases where OutboundCall is stuck in SENT state and there is
// no callback invoked for the call. When FLAGS_consensus_stuck_peer_call_threshold_ms config is
// set, we try to detect this situation. And if
// FLAGS_consensus_force_recover_from_stuck_peer_call config is set to true, then we will mark
// the outstanding call as failed. Note: if there is no outstanding call in controller or
// timeout is not initialized, then we won't be able to recover it (we didn't see this situation
// in production).
auto timeout = controller_.timeout();
if (FLAGS_consensus_stuck_peer_call_threshold_ms > 0 && timeout.Initialized()) {
const MonoDelta stuck_threshold = FLAGS_consensus_stuck_peer_call_threshold_ms * 1ms;
auto now = CoarseMonoClock::Now();
auto last_rpc_start_time = last_rpc_start_time_.load(std::memory_order_acquire);
if (last_rpc_start_time != CoarseTimePoint::min() &&
now > last_rpc_start_time + stuck_threshold + timeout && !controller_.finished()) {
LOG_WITH_PREFIX(INFO) << Format(
"Found a RPC call in stuck state - timeout: $0, last_rpc_start_time: $1, "
"stuck threshold: $2, force recover: $3, call state: $4",
timeout, last_rpc_start_time, stuck_threshold,
FLAGS_consensus_force_recover_from_stuck_peer_call, controller_.CallStateDebugString());
if (FLAGS_consensus_force_recover_from_stuck_peer_call) {
controller_.MarkCallAsFailed();
}
}
}
return Status::OK();
}

Expand Down Expand Up @@ -203,10 +236,14 @@ void Peer::DumpToHtml(std::ostream& out) const {
out << "Peer:" << std::endl;
std::lock_guard lock(peer_lock_);
out << Format(
"<ul><li>$0</li><li>$1</li><li>$2</li><li>$3</li></ul>",
"<ul><li>$0</li><li>$1</li><li>$2</li><li>$3</li><li>$4</li><li>$5</li></ul>",
EscapeForHtmlToString(Format("State: $0", state_)),
EscapeForHtmlToString(Format("Current Heartbeat Id: $0", cur_heartbeat_id_)),
EscapeForHtmlToString(Format("Failed Attempts: $0", failed_attempts_)),
EscapeForHtmlToString(Format(
"Last RPC start time: $0", last_rpc_start_time_.load(std::memory_order_acquire))),
EscapeForHtmlToString(
Format("WaitingForRPCResponse: $0", performing_update_mutex_.is_locked())),
peer_pb_str)
<< std::endl;
}
Expand Down Expand Up @@ -366,6 +403,7 @@ void Peer::SendNextRequest(RequestTriggerMode trigger_mode) {
processing_lock.unlock();
performing_update_lock.release();
controller_.set_invoke_callback_mode(rpc::InvokeCallbackMode::kThreadPoolHigh);
last_rpc_start_time_.store(CoarseMonoClock::now(), std::memory_order_release);
proxy_->UpdateAsync(update_request_, trigger_mode, update_response_, &controller_,
std::bind(&Peer::ProcessResponse, retain_self));
}
Expand Down Expand Up @@ -454,6 +492,7 @@ void Peer::ProcessResponse() {
if (status.ok()) {
status = controller_.thread_pool_failure();
}
last_rpc_start_time_.store(CoarseTimePoint::min(), std::memory_order_release);
controller_.Reset();

auto performing_update_lock = LockPerformingUpdate(std::adopt_lock);
Expand All @@ -473,6 +512,7 @@ void Peer::ProcessResponse() {
void Peer::ProcessHeartbeatResponse(const Status& status) {
DCHECK(performing_heartbeat_mutex_.is_locked()) << "Got a heartbeat when nothing was pending.";
DCHECK(heartbeat_request_.ops().empty()) << "Got a heartbeat with a non-zero number of ops.";
last_rpc_start_time_.store(CoarseMonoClock::now(), std::memory_order_release);

auto performing_heartbeat_lock = LockPerformingHeartbeat(std::adopt_lock);
auto processing_lock = StartProcessingUnlocked();
Expand Down Expand Up @@ -508,6 +548,7 @@ Status Peer::SendRemoteBootstrapRequest() {
YB_LOG_WITH_PREFIX_EVERY_N_SECS(INFO, 30) << "Sending request to remotely bootstrap";
controller_.set_invoke_callback_mode(rpc::InvokeCallbackMode::kThreadPoolNormal);
return raft_pool_token_->SubmitFunc([retain_self = shared_from_this()]() {
retain_self->last_rpc_start_time_.store(CoarseMonoClock::now(), std::memory_order_release);
retain_self->proxy_->StartRemoteBootstrap(
&retain_self->rb_request_, &retain_self->rb_response_, &retain_self->controller_,
std::bind(&Peer::ProcessRemoteBootstrapResponse, retain_self));
Expand All @@ -516,6 +557,7 @@ Status Peer::SendRemoteBootstrapRequest() {

void Peer::ProcessRemoteBootstrapResponse() {
Status status = controller_.status();
last_rpc_start_time_.store(CoarseTimePoint::min(), std::memory_order_release);
controller_.Reset();

auto performing_update_lock = LockPerformingUpdate(std::adopt_lock);
Expand Down Expand Up @@ -621,10 +663,9 @@ void RpcPeerProxy::UpdateAsync(const LWConsensusRequestPB* request,
consensus_proxy_->UpdateConsensusAsync(*request, response, controller, callback);
}

void RpcPeerProxy::RequestConsensusVoteAsync(const VoteRequestPB* request,
VoteResponsePB* response,
rpc::RpcController* controller,
const rpc::ResponseCallback& callback) {
void RpcPeerProxy::RequestConsensusVoteAsync(
const VoteRequestPB* request, VoteResponsePB* response, rpc::RpcController* controller,
const rpc::ResponseCallback& callback) {
consensus_proxy_->RequestConsensusVoteAsync(*request, response, controller, callback);
}

Expand All @@ -636,17 +677,15 @@ void RpcPeerProxy::RunLeaderElectionAsync(const RunLeaderElectionRequestPB* requ
consensus_proxy_->RunLeaderElectionAsync(*request, response, controller, callback);
}

void RpcPeerProxy::LeaderElectionLostAsync(const LeaderElectionLostRequestPB* request,
LeaderElectionLostResponsePB* response,
rpc::RpcController* controller,
const rpc::ResponseCallback& callback) {
void RpcPeerProxy::LeaderElectionLostAsync(
const LeaderElectionLostRequestPB* request, LeaderElectionLostResponsePB* response,
rpc::RpcController* controller, const rpc::ResponseCallback& callback) {
consensus_proxy_->LeaderElectionLostAsync(*request, response, controller, callback);
}

void RpcPeerProxy::StartRemoteBootstrap(const StartRemoteBootstrapRequestPB* request,
StartRemoteBootstrapResponsePB* response,
rpc::RpcController* controller,
const rpc::ResponseCallback& callback) {
void RpcPeerProxy::StartRemoteBootstrap(
const StartRemoteBootstrapRequestPB* request, StartRemoteBootstrapResponsePB* response,
rpc::RpcController* controller, const rpc::ResponseCallback& callback) {
consensus_proxy_->StartRemoteBootstrapAsync(*request, response, controller, callback);
}

Expand Down
1 change: 1 addition & 0 deletions src/yb/consensus/consensus_peers.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ class Peer : public std::enable_shared_from_this<Peer> {
StartRemoteBootstrapResponsePB rb_response_;

rpc::RpcController controller_;
std::atomic<CoarseTimePoint> last_rpc_start_time_{CoarseTimePoint::min()};

// Held if there is an outstanding request. This is used in order to ensure that we only have a
// single request outstanding at a time, and to wait for the outstanding requests at Close().
Expand Down
98 changes: 83 additions & 15 deletions src/yb/rpc/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

#include "yb/rpc/connection.h"

#include <atomic>
#include <sstream>
#include <thread>
#include <utility>

Expand All @@ -47,6 +49,7 @@
#include "yb/rpc/rpc_introspection.pb.h"
#include "yb/rpc/rpc_metrics.h"

#include "yb/util/debug-util.h"
#include "yb/util/enums.h"
#include "yb/util/format.h"
#include "yb/util/logging.h"
Expand Down Expand Up @@ -121,7 +124,16 @@ std::string Connection::ReasonNotIdle() const {
return reason;
}

void Connection::Shutdown(const Status& status) {
void Connection::Shutdown(const Status& provided_status) {
if (provided_status.ok()) {
LOG_WITH_PREFIX(DFATAL)
<< "Connection shutdown called with an OK status, replacing with an error:\n"
<< GetStackTrace();
}
const Status status =
provided_status.ok()
? STATUS_FORMAT(RuntimeError, "Connection shutdown called with OK status")
: provided_status;
{
std::vector<OutboundDataPtr> outbound_data_being_processed;
{
Expand All @@ -131,15 +143,19 @@ void Connection::Shutdown(const Status& status) {
shutdown_status_ = status;
}

shutdown_time_.store(reactor_->cur_time(), std::memory_order_release);

auto self = shared_from_this();
for (auto& call : outbound_data_being_processed) {
call->Transferred(status, this);
call->Transferred(status, self);
}
}

context_->Shutdown(status);
stream_->Shutdown(status);

// Clear any calls which have been sent and were awaiting a response.
active_calls_during_shutdown_.store(active_calls_.size(), std::memory_order_release);
for (auto& v : active_calls_) {
if (v.call && !v.call->IsFinished()) {
v.call->SetFailed(status);
Expand All @@ -157,16 +173,19 @@ void Connection::OutboundQueued() {
auto status = stream_->TryWrite();
if (!status.ok()) {
VLOG_WITH_PREFIX(1) << "Write failed: " << status;
// Even though we are already on the reactor thread, try to schedule a task so that it would run
// later than all other already scheduled tasks, to preserve historical behavior.
auto scheduling_status = reactor_->ScheduleReactorTask(
MakeFunctorReactorTask(
std::bind(&Reactor::DestroyConnection, reactor_, this, status),
shared_from_this(), SOURCE_LOCATION()));
if (!scheduling_status.ok()) {
LOG(WARNING) << "Failed to schedule DestroyConnection: " << scheduling_status
<< "on reactor, destroying connection immediately";
reactor_->DestroyConnection(this, status);

if (!queued_destroy_connection_.exchange(true, std::memory_order_acq_rel)) {
// Even though we are already on the reactor thread, try to schedule a task so that it
// would run later than all other already scheduled tasks, to preserve historical
// behavior.
auto scheduling_status = reactor_->ScheduleReactorTask(MakeFunctorReactorTask(
std::bind(&Reactor::DestroyConnection, reactor_, this, status), shared_from_this(),
SOURCE_LOCATION()));
if (!scheduling_status.ok()) {
LOG(WARNING) << "Failed to schedule DestroyConnection: " << scheduling_status
<< "on reactor, destroying connection immediately";
reactor_->DestroyConnection(this, status);
}
}
}
}
Expand Down Expand Up @@ -280,7 +299,8 @@ size_t Connection::DoQueueOutboundData(OutboundDataPtr outbound_data, bool batch
YB_LOG_EVERY_N_SECS(INFO, 5) << "Connection::DoQueueOutboundData data: "
<< AsString(outbound_data) << " shutdown_status_: "
<< shutdown_status;
outbound_data->Transferred(shutdown_status, this);
outbound_data->Transferred(shutdown_status, shared_from_this());
calls_queued_after_shutdown_.fetch_add(1, std::memory_order_acq_rel);
return std::numeric_limits<size_t>::max();
}

Expand Down Expand Up @@ -372,6 +392,51 @@ std::string Connection::ToString() const {
FATAL_INVALID_ENUM_VALUE(Direction, direction_);
}

void Connection::QueueDumpConnectionState(int32_t call_id, const void* call_ptr) const {
auto task = MakeFunctorReactorTask(
std::bind(&Connection::DumpConnectionState, this, call_id, call_ptr), shared_from_this(),
SOURCE_LOCATION());
auto scheduling_status = reactor_->ScheduleReactorTask(task);
LOG_IF_WITH_PREFIX(DFATAL, !scheduling_status.ok())
<< "Failed to schedule call to dump connection state: " << scheduling_status;
}

void Connection::DumpConnectionState(int32_t call_id, const void* call_ptr) const {
auto earliest_expiry = active_calls_.empty()
? CoarseTimePoint()
: active_calls_.get<ExpirationTag>().begin()->expires_at;
auto found_call_id = active_calls_.find(call_id) != active_calls_.end();
LOG_WITH_PREFIX(INFO) << Format(
"LastActivityTime: $0, "
"ActiveCalls stats: { "
"during shutdown: $1, "
"current size: $2, "
"earliest expiry: $3, "
"}, "
"OutboundCall: { "
"ptr: $4, "
"call id: $5, "
"is active: $6 "
"}, "
"Shutdown status: $7, "
"Shutdown time: $8, "
"Queue attempts after shutdown: { "
"calls: $9, "
"responses: $10 "
"}",
/* $0 */ last_activity_time(),
/* $1 */ active_calls_during_shutdown_.load(std::memory_order_acquire),
/* $2 */ active_calls_.size(),
/* $3 */ earliest_expiry,
/* $4 */ call_ptr,
/* $5 */ call_id,
/* $6 */ found_call_id,
/* $7 */ shutdown_status_,
/* $8 */ shutdown_time_,
/* $9 */ calls_queued_after_shutdown_.load(std::memory_order_acquire),
/* $10 */ responses_queued_after_shutdown_.load(std::memory_order_acquire));
}

Status Connection::DumpPB(const DumpRunningRpcsRequestPB& req,
RpcConnectionPB* resp) {
resp->set_remote_ip(yb::ToString(remote()));
Expand Down Expand Up @@ -429,6 +494,7 @@ Status Connection::QueueOutboundData(OutboundDataPtr outbound_data) {
std::bind(&OutboundData::Transferred, outbound_data, _2, /* conn */ nullptr),
SOURCE_LOCATION());
outbound_data_queue_lock.unlock();
responses_queued_after_shutdown_.fetch_add(1, std::memory_order_acq_rel);
auto scheduling_status = reactor_->ScheduleReactorTask(task, true /* even_if_not_running */);
LOG_IF_WITH_PREFIX(DFATAL, !scheduling_status.ok())
<< "Failed to schedule OutboundData::Transferred: " << scheduling_status;
Expand Down Expand Up @@ -520,11 +586,13 @@ void Connection::UpdateLastWrite() {
}

void Connection::Transferred(const OutboundDataPtr& data, const Status& status) {
data->Transferred(status, this);
data->Transferred(status, shared_from_this());
}

void Connection::Destroy(const Status& status) {
reactor_->DestroyConnection(this, status);
if (!queued_destroy_connection_.exchange(true, std::memory_order_acq_rel)) {
reactor_->DestroyConnection(this, status);
}
}

std::string Connection::LogPrefix() const {
Expand Down
15 changes: 14 additions & 1 deletion src/yb/rpc/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ class Connection final : public StreamContext, public std::enable_shared_from_th
// Safe to be called from other threads.
std::string ToString() const;

// Dump the connection state for provided call id. This includes the information realted to
// connection shutdown time, and whether the call id is present in the active_calls_ or not.
// call_ptr is used only for logging to correlate with OutboundCall trace.
void QueueDumpConnectionState(int32_t call_id, const void* call_ptr) const;

Direction direction() const { return direction_; }

// Queue a call response back to the client on the server side.
Expand Down Expand Up @@ -225,6 +230,8 @@ class Connection final : public StreamContext, public std::enable_shared_from_th
StreamReadBuffer& ReadBuffer() override;

void CleanupExpirationQueue(CoarseTimePoint now) ON_REACTOR_THREAD;
// call_ptr is used only for logging to correlate with OutboundCall trace.
void DumpConnectionState(int32_t call_id, const void* call_ptr) const ON_REACTOR_THREAD;

std::string LogPrefix() const;

Expand Down Expand Up @@ -299,6 +306,7 @@ class Connection final : public StreamContext, public std::enable_shared_from_th

// Starts as Status::OK, gets set to a shutdown status upon Shutdown().
Status shutdown_status_ GUARDED_BY(outbound_data_queue_mtx_);
std::atomic<CoarseTimePoint> shutdown_time_{CoarseTimePoint::min()};

std::shared_ptr<ReactorTask> process_response_queue_task_ GUARDED_BY(outbound_data_queue_mtx_);

Expand All @@ -307,9 +315,14 @@ class Connection final : public StreamContext, public std::enable_shared_from_th
// ----------------------------------------------------------------------------------------------

std::atomic<uint64_t> responded_call_count_{0};
std::atomic<size_t> active_calls_during_shutdown_{0};
std::atomic<size_t> calls_queued_after_shutdown_{0};
std::atomic<size_t> responses_queued_after_shutdown_{0};

// The last time we read or wrote from the socket.
std::atomic<CoarseTimePoint> last_activity_time_;
std::atomic<CoarseTimePoint> last_activity_time_{CoarseTimePoint::min()};

std::atomic<bool> queued_destroy_connection_{false};
};

} // namespace rpc
Expand Down
2 changes: 1 addition & 1 deletion src/yb/rpc/inbound_call.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ InboundCall::~InboundCall() {
DecrementGauge(rpc_metrics_->inbound_calls_alive);
}

void InboundCall::NotifyTransferred(const Status& status, Connection* conn) {
void InboundCall::NotifyTransferred(const Status& status, const ConnectionPtr& conn) {
if (status.ok()) {
TRACE_TO(trace(), "Transfer finished");
} else {
Expand Down
Loading

0 comments on commit 98586ef

Please sign in to comment.