diff --git a/src/yb/consensus/raft_consensus.cc b/src/yb/consensus/raft_consensus.cc index 2b0647d70df2..4d821860a4a1 100644 --- a/src/yb/consensus/raft_consensus.cc +++ b/src/yb/consensus/raft_consensus.cc @@ -1364,8 +1364,8 @@ Status RaftConsensus::UpdateReplica(ConsensusRequestPB* request, clock_->Update(HybridTime(request->propagated_hybrid_time())); } - Synchronizer log_synchronizer; - StatusCallback sync_status_cb = log_synchronizer.AsStatusCallback(); + auto log_synchronizer = std::make_shared(); + StatusCallback sync_status_cb = Synchronizer::AsStatusCallback(log_synchronizer); // The ordering of the following operations is crucial, read on for details. // @@ -1516,7 +1516,7 @@ Status RaftConsensus::UpdateReplica(ConsensusRequestPB* request, // Release the lock while we wait for the log append to finish so that commits can go through. // We'll re-acquire it before we update the state again. - RETURN_NOT_OK(WaitWritesUnlocked(deduped_req, &log_synchronizer)); + RETURN_NOT_OK(WaitWritesUnlocked(deduped_req, log_synchronizer.get())); if (PREDICT_FALSE(VLOG_IS_ON(2))) { VLOG_WITH_PREFIX(2) << "Replica updated." diff --git a/src/yb/util/CMakeLists.txt b/src/yb/util/CMakeLists.txt index 7f7e3c7ddd78..57fada46f0fc 100644 --- a/src/yb/util/CMakeLists.txt +++ b/src/yb/util/CMakeLists.txt @@ -217,6 +217,7 @@ set(UTIL_SRCS uuid.cc varint.cc version_info.cc + async_util.cc ${UTIL_SRCS_EXTENSIONS} ) diff --git a/src/yb/util/async_util.cc b/src/yb/util/async_util.cc new file mode 100644 index 000000000000..655cb01813d7 --- /dev/null +++ b/src/yb/util/async_util.cc @@ -0,0 +1,109 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. + +#include "yb/util/async_util.h" + +#include "yb/util/logging.h" + +namespace yb { + +namespace { + +void CallStatusCBMaybe(std::weak_ptr weak_sync, const Status& status) { + auto sync = weak_sync.lock(); + if (sync) { + sync->StatusCB(status); + } +} + +} // anonymous namespace + +Synchronizer::~Synchronizer() { + EnsureWaitDone(); +} + +void Synchronizer::StatusCB(const Status& status) { + std::lock_guard lock(mutex_); + if (!assigned_) { + assigned_ = true; + status_ = status; + cond_.notify_all(); + } else { + LOG(DFATAL) << "Status already assigned, existing: " << status_ << ", new: " << status; + } +} + +StatusCallback Synchronizer::AsStatusCallback() { + DCHECK(!assigned_); + + // Cannot destroy the synchronizer without calling Wait(). + must_wait_ = true; + return Bind(&Synchronizer::StatusCB, Unretained(this)); +} + +StdStatusCallback Synchronizer::AsStdStatusCallback() { + DCHECK(!assigned_); + + // Cannot destroy the synchronizer without calling Wait(). + must_wait_ = true; + return std::bind(&Synchronizer::StatusCB, this, std::placeholders::_1); +} + +StatusCallback Synchronizer::AsStatusCallback(const std::shared_ptr& synchronizer) { + DCHECK(!synchronizer->assigned_); + // No need to set must_wait_ here -- the callback knows whether Synchronizer still exists. + std::weak_ptr weak_sync(synchronizer); + return Bind(CallStatusCBMaybe, weak_sync); +} + +Status Synchronizer::WaitUntil(const std::chrono::steady_clock::time_point& time) { + std::unique_lock lock(mutex_); + auto predicate = [this] { return assigned_; }; + if (time == std::chrono::steady_clock::time_point::max()) { + cond_.wait(lock, predicate); + } else if (!cond_.wait_until(lock, time, predicate)) { + return STATUS(TimedOut, "Timed out while waiting for the callback to be called."); + } + + // The callback that keep a pointer to this potentially stack-allocated synchronizer has been + // called, assuming there was only one such callback. OK for the synchronizer to go out of + // scope. + must_wait_ = false; + + return status_; +} + +void Synchronizer::Reset() { + std::lock_guard lock(mutex_); + EnsureWaitDone(); + assigned_ = false; + status_ = Status::OK(); + must_wait_ = false; +} + +void Synchronizer::EnsureWaitDone() { + if (must_wait_) { + static const char* kErrorMsg = + "Synchronizer went out of scope, Wait() has returned success, callbacks may " + "access invalid memory!"; + +#ifndef NDEBUG + LOG(FATAL) << kErrorMsg; +#else + const int kWaitSec = 10; + YB_LOG_EVERY_N_SECS(ERROR, 1) << kErrorMsg << " Waiting up to " << kWaitSec << " seconds"; + CHECK_OK(WaitFor(MonoDelta::FromSeconds(kWaitSec))); +#endif + } +} + +} // namespace yb diff --git a/src/yb/util/async_util.h b/src/yb/util/async_util.h index 26c8c46effd0..455e4897e780 100644 --- a/src/yb/util/async_util.h +++ b/src/yb/util/async_util.h @@ -58,39 +58,20 @@ class Synchronizer { void operator=(const Synchronizer&) = delete; Synchronizer() {} + ~Synchronizer(); - void StatusCB(const Status& status) { - std::lock_guard lock(mutex_); - if (!assigned_) { - assigned_ = true; - status_ = status; - cond_.notify_all(); - } else { - LOG(DFATAL) << "Status already assigned, existing: " << status_ << ", new: " << status; - } - } - - StatusCallback AsStatusCallback() { - DCHECK(!assigned_); + void StatusCB(const Status& status); - // Synchronizers are often declared on the stack, so it doesn't make - // sense for a callback to take a reference to its synchronizer. - // - // Note: this means the returned callback _must_ go out of scope before - // its synchronizer. - return Bind(&Synchronizer::StatusCB, Unretained(this)); - } + // Use this for synchronizers declared on the stack. The callback does not take a reference to + // its synchronizer, so the returned callback _must_ go out of scope before its synchronizer. + StatusCallback AsStatusCallback(); - StdStatusCallback AsStdStatusCallback() { - DCHECK(!assigned_); + // Same semantics as AsStatusCallback. + StdStatusCallback AsStdStatusCallback(); - // Synchronizers are often declared on the stack, so it doesn't make - // sense for a callback to take a reference to its synchronizer. - // - // Note: this means the returned callback _must_ go out of scope before - // its synchronizer. - return std::bind(&Synchronizer::StatusCB, this, std::placeholders::_1); - } + // This version of AsStatusCallback is for cases when the callback can outlive the synchronizer. + // The callback holds a weak pointer to the synchronizer. + static StatusCallback AsStatusCallback(const std::shared_ptr& synchronizer); boost::function AsStatusFunctor() { return std::bind(&Synchronizer::StatusCB, this, std::placeholders::_1); @@ -104,28 +85,26 @@ class Synchronizer { return WaitUntil(std::chrono::steady_clock::now() + delta.ToSteadyDuration()); } - CHECKED_STATUS WaitUntil(const std::chrono::steady_clock::time_point& time) { - std::unique_lock lock(mutex_); - auto predicate = [this] { return assigned_; }; - if (time == std::chrono::steady_clock::time_point::max()) { - cond_.wait(lock, predicate); - } else if (!cond_.wait_until(lock, time, predicate)) { - return STATUS(TimedOut, "Timed out while waiting for the callback to be called."); - } - - return status_; - } + CHECKED_STATUS WaitUntil(const std::chrono::steady_clock::time_point& time); - void Reset() { - std::lock_guard lock(mutex_); - assigned_ = false; - status_ = Status::OK(); - } + void Reset(); private: + + // Invoked in the destructor and in Reset() to make sure Wait() was invoked if it had to be. + void EnsureWaitDone(); + std::mutex mutex_; std::condition_variable cond_; bool assigned_ = false; + + // If we've created a callback and given it out to an asynchronous operation, we must call Wait() + // on the synchronizer before destroying it. Not doing any locking around this variable because + // Wait() is supposed to be called on the same thread as AsStatusCallback(), or with adequate + // synchronization after that. Most frequently Wait() is called right after creating the + // synchronizer. + bool must_wait_ = false; + Status status_; };