Skip to content

Commit

Permalink
[#24000] DocDB: Shutting down shared exchange could cause TServer to …
Browse files Browse the repository at this point in the history
…hang

Summary:
When shared exchange should finish, we set state to Shutdown and notify exchange about it.
But it could happen that concurrent thread could overwrite state, so shutdown will be missing.

To avoid that, changed all state transfers except Shutdown to use compare_exchange_strong.

Also PgClient session is destroyed while holding common mutex on all sessions.
Since session also tried to destroy shared exchange, it could hang holding this mutex.
To avoid total TServer hang on issues with destroying exchange - moved session destroy out of this mutex.

Also set pg_client_use_shared_memory to false by default on Mac.
Since Mac semaphore implementation is pretty slow (single round trip takes 10ms), and initdb takes 200s+.
Jira: DB-12888

Test Plan: Jenkins

Reviewers: rthallam, esheng

Reviewed By: esheng

Subscribers: ybase, yql

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D38207
  • Loading branch information
spolitov committed Sep 23, 2024
1 parent 09f7a0f commit 555af7d
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 24 deletions.
51 changes: 31 additions & 20 deletions src/yb/tserver/pg_client_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ using namespace std::literals;
DEFINE_UNKNOWN_uint64(pg_client_session_expiration_ms, 60000,
"Pg client session expiration time in milliseconds.");

DEFINE_RUNTIME_bool(pg_client_use_shared_memory, !yb::kIsDebug,
DEFINE_RUNTIME_bool(pg_client_use_shared_memory, !yb::kIsDebug && !yb::kIsMac,
"Use shared memory for executing read and write pg client queries");

DEFINE_RUNTIME_int32(get_locks_status_max_retry_attempts, 2,
Expand Down Expand Up @@ -1829,32 +1829,43 @@ class PgClientServiceImpl::Impl {

void CheckExpiredSessions() {
auto now = CoarseMonoClock::now();
std::vector<uint64_t> expired_sessions;
std::lock_guard lock(mutex_);
while (!session_expiration_queue_.empty()) {
auto& top = session_expiration_queue_.top();
if (top.first > now) {
break;
}
auto id = top.second;
session_expiration_queue_.pop();
auto it = sessions_.find(id);
if (it != sessions_.end()) {
auto current_expiration = (**it).session().expiration();
if (current_expiration > now) {
session_expiration_queue_.push({current_expiration, id});
} else {
expired_sessions.push_back(id);
sessions_.erase(it);
std::vector<SessionInfoPtr> expired_sessions;
{
std::lock_guard lock(mutex_);
while (!session_expiration_queue_.empty()) {
auto& top = session_expiration_queue_.top();
if (top.first > now) {
break;
}
auto id = top.second;
session_expiration_queue_.pop();
auto it = sessions_.find(id);
if (it != sessions_.end()) {
auto current_expiration = (**it).session().expiration();
if (current_expiration > now) {
session_expiration_queue_.push({current_expiration, id});
} else {
expired_sessions.push_back(*it);
sessions_.erase(it);
}
}
}
ScheduleCheckExpiredSessions(now);
}
if (expired_sessions.empty()) {
return;
}
auto cdc_service = tablet_server_.GetCDCService();
// We only want to call this on tablet servers. On master, cdc_service will be null.
if (cdc_service) {
cdc_service->DestroyVirtualWALBatchForCDC(expired_sessions);
std::vector<uint64_t> expired_session_ids;
expired_session_ids.reserve(expired_sessions.size());
for (auto& session : expired_sessions) {
expired_session_ids.push_back(session->id());
}
expired_sessions.clear();
cdc_service->DestroyVirtualWALBatchForCDC(expired_session_ids);
}
ScheduleCheckExpiredSessions(now);
}

Status DoPerform(PgPerformRequestPB* req, PgPerformResponsePB* resp, rpc::RpcContext* context) {
Expand Down
19 changes: 15 additions & 4 deletions src/yb/tserver/tserver_shared_mem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,24 +172,33 @@ class SharedExchangeHeader {

Status SendRequest(bool failed_previous_request, size_t size) {
auto state = state_.load(std::memory_order_acquire);
if (!ReadyToSend(failed_previous_request)) {
if (!ReadyToSend(state, failed_previous_request)) {
return STATUS_FORMAT(IllegalState, "Send request in wrong state: $0", state);
}
if (ANNOTATE_UNPROTECTED_READ(FLAGS_TEST_pg_client_crash_on_shared_memory_send)) {
LOG(FATAL) << "For test: crashing while sending request";
}
state_.store(SharedExchangeState::kRequestSent, std::memory_order_release);
RETURN_NOT_OK(TransferState(state, SharedExchangeState::kRequestSent));
data_size_ = size;
return request_semaphore_.Post();
}

Status TransferState(SharedExchangeState old_state, SharedExchangeState new_state) {
SharedExchangeState actual_state = old_state;
if (state_.compare_exchange_strong(actual_state, new_state, std::memory_order_acq_rel)) {
return Status::OK();
}
return STATUS_FORMAT(
IllegalState, "Wrong state, $0 expected, but $1 found", old_state, actual_state);
}

bool ResponseReady() {
return state_.load(std::memory_order_acquire) == SharedExchangeState::kResponseSent;
}

Result<size_t> FetchResponse(std::chrono::system_clock::time_point deadline) {
RETURN_NOT_OK(DoWait(SharedExchangeState::kResponseSent, deadline, &response_semaphore_));
state_.store(SharedExchangeState::kIdle, std::memory_order_release);
RETURN_NOT_OK(TransferState(SharedExchangeState::kResponseSent, SharedExchangeState::kIdle));
return data_size_;
}

Expand All @@ -202,7 +211,9 @@ class SharedExchangeHeader {
}

data_size_ = size;
state_.store(SharedExchangeState::kResponseSent, std::memory_order_release);
WARN_NOT_OK(
TransferState(SharedExchangeState::kRequestSent, SharedExchangeState::kResponseSent),
"Transfer state failed");
WARN_NOT_OK(response_semaphore_.Post(), "Respond failed");
}

Expand Down
6 changes: 6 additions & 0 deletions src/yb/util/debug.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,10 @@ constexpr bool kIsDebug = false;
constexpr bool kIsDebug = true;
#endif

#if defined(__APPLE__)
constexpr bool kIsMac = true;
#else
constexpr bool kIsMac = false;
#endif

} // namespace yb

0 comments on commit 555af7d

Please sign in to comment.