From 0d07add220f536fcf3b27402ea0086776198fa79 Mon Sep 17 00:00:00 2001 From: Mikhail Bautin Date: Fri, 27 Apr 2018 17:33:36 -0700 Subject: [PATCH] #239 Callback should keep a reference to Synchronizer in UpdateReplica Summary: In UpdateReplica we use a synchronizer, get a callback from it, and pass it to the log append logic. In some cases UpdateReplica can return earlier, and the callback will try to reference invalid memory. To fix this, we manage the synchronizer using a shared pointer in that case. Also we add a check that if the original version of AsStatusCallback is called, Wait() must have been called by the time the synchronizer is destroyed. Test Plan: Jenkins Reviewers: sergei, kannan, amitanand Reviewed By: amitanand Subscribers: ybase, bharat Differential Revision: https://phabricator.dev.yugabyte.com/D4699 --- src/yb/consensus/raft_consensus.cc | 6 +- src/yb/util/CMakeLists.txt | 1 + src/yb/util/async_util.cc | 109 +++++++++++++++++++++++++++++ src/yb/util/async_util.h | 69 +++++++----------- 4 files changed, 137 insertions(+), 48 deletions(-) create mode 100644 src/yb/util/async_util.cc diff --git a/src/yb/consensus/raft_consensus.cc b/src/yb/consensus/raft_consensus.cc index 2b0647d70df2..4d821860a4a1 100644 --- a/src/yb/consensus/raft_consensus.cc +++ b/src/yb/consensus/raft_consensus.cc @@ -1364,8 +1364,8 @@ Status RaftConsensus::UpdateReplica(ConsensusRequestPB* request, clock_->Update(HybridTime(request->propagated_hybrid_time())); } - Synchronizer log_synchronizer; - StatusCallback sync_status_cb = log_synchronizer.AsStatusCallback(); + auto log_synchronizer = std::make_shared(); + StatusCallback sync_status_cb = Synchronizer::AsStatusCallback(log_synchronizer); // The ordering of the following operations is crucial, read on for details. // @@ -1516,7 +1516,7 @@ Status RaftConsensus::UpdateReplica(ConsensusRequestPB* request, // Release the lock while we wait for the log append to finish so that commits can go through. // We'll re-acquire it before we update the state again. - RETURN_NOT_OK(WaitWritesUnlocked(deduped_req, &log_synchronizer)); + RETURN_NOT_OK(WaitWritesUnlocked(deduped_req, log_synchronizer.get())); if (PREDICT_FALSE(VLOG_IS_ON(2))) { VLOG_WITH_PREFIX(2) << "Replica updated." diff --git a/src/yb/util/CMakeLists.txt b/src/yb/util/CMakeLists.txt index 7f7e3c7ddd78..57fada46f0fc 100644 --- a/src/yb/util/CMakeLists.txt +++ b/src/yb/util/CMakeLists.txt @@ -217,6 +217,7 @@ set(UTIL_SRCS uuid.cc varint.cc version_info.cc + async_util.cc ${UTIL_SRCS_EXTENSIONS} ) diff --git a/src/yb/util/async_util.cc b/src/yb/util/async_util.cc new file mode 100644 index 000000000000..655cb01813d7 --- /dev/null +++ b/src/yb/util/async_util.cc @@ -0,0 +1,109 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. + +#include "yb/util/async_util.h" + +#include "yb/util/logging.h" + +namespace yb { + +namespace { + +void CallStatusCBMaybe(std::weak_ptr weak_sync, const Status& status) { + auto sync = weak_sync.lock(); + if (sync) { + sync->StatusCB(status); + } +} + +} // anonymous namespace + +Synchronizer::~Synchronizer() { + EnsureWaitDone(); +} + +void Synchronizer::StatusCB(const Status& status) { + std::lock_guard lock(mutex_); + if (!assigned_) { + assigned_ = true; + status_ = status; + cond_.notify_all(); + } else { + LOG(DFATAL) << "Status already assigned, existing: " << status_ << ", new: " << status; + } +} + +StatusCallback Synchronizer::AsStatusCallback() { + DCHECK(!assigned_); + + // Cannot destroy the synchronizer without calling Wait(). + must_wait_ = true; + return Bind(&Synchronizer::StatusCB, Unretained(this)); +} + +StdStatusCallback Synchronizer::AsStdStatusCallback() { + DCHECK(!assigned_); + + // Cannot destroy the synchronizer without calling Wait(). + must_wait_ = true; + return std::bind(&Synchronizer::StatusCB, this, std::placeholders::_1); +} + +StatusCallback Synchronizer::AsStatusCallback(const std::shared_ptr& synchronizer) { + DCHECK(!synchronizer->assigned_); + // No need to set must_wait_ here -- the callback knows whether Synchronizer still exists. + std::weak_ptr weak_sync(synchronizer); + return Bind(CallStatusCBMaybe, weak_sync); +} + +Status Synchronizer::WaitUntil(const std::chrono::steady_clock::time_point& time) { + std::unique_lock lock(mutex_); + auto predicate = [this] { return assigned_; }; + if (time == std::chrono::steady_clock::time_point::max()) { + cond_.wait(lock, predicate); + } else if (!cond_.wait_until(lock, time, predicate)) { + return STATUS(TimedOut, "Timed out while waiting for the callback to be called."); + } + + // The callback that keep a pointer to this potentially stack-allocated synchronizer has been + // called, assuming there was only one such callback. OK for the synchronizer to go out of + // scope. + must_wait_ = false; + + return status_; +} + +void Synchronizer::Reset() { + std::lock_guard lock(mutex_); + EnsureWaitDone(); + assigned_ = false; + status_ = Status::OK(); + must_wait_ = false; +} + +void Synchronizer::EnsureWaitDone() { + if (must_wait_) { + static const char* kErrorMsg = + "Synchronizer went out of scope, Wait() has returned success, callbacks may " + "access invalid memory!"; + +#ifndef NDEBUG + LOG(FATAL) << kErrorMsg; +#else + const int kWaitSec = 10; + YB_LOG_EVERY_N_SECS(ERROR, 1) << kErrorMsg << " Waiting up to " << kWaitSec << " seconds"; + CHECK_OK(WaitFor(MonoDelta::FromSeconds(kWaitSec))); +#endif + } +} + +} // namespace yb diff --git a/src/yb/util/async_util.h b/src/yb/util/async_util.h index 26c8c46effd0..455e4897e780 100644 --- a/src/yb/util/async_util.h +++ b/src/yb/util/async_util.h @@ -58,39 +58,20 @@ class Synchronizer { void operator=(const Synchronizer&) = delete; Synchronizer() {} + ~Synchronizer(); - void StatusCB(const Status& status) { - std::lock_guard lock(mutex_); - if (!assigned_) { - assigned_ = true; - status_ = status; - cond_.notify_all(); - } else { - LOG(DFATAL) << "Status already assigned, existing: " << status_ << ", new: " << status; - } - } - - StatusCallback AsStatusCallback() { - DCHECK(!assigned_); + void StatusCB(const Status& status); - // Synchronizers are often declared on the stack, so it doesn't make - // sense for a callback to take a reference to its synchronizer. - // - // Note: this means the returned callback _must_ go out of scope before - // its synchronizer. - return Bind(&Synchronizer::StatusCB, Unretained(this)); - } + // Use this for synchronizers declared on the stack. The callback does not take a reference to + // its synchronizer, so the returned callback _must_ go out of scope before its synchronizer. + StatusCallback AsStatusCallback(); - StdStatusCallback AsStdStatusCallback() { - DCHECK(!assigned_); + // Same semantics as AsStatusCallback. + StdStatusCallback AsStdStatusCallback(); - // Synchronizers are often declared on the stack, so it doesn't make - // sense for a callback to take a reference to its synchronizer. - // - // Note: this means the returned callback _must_ go out of scope before - // its synchronizer. - return std::bind(&Synchronizer::StatusCB, this, std::placeholders::_1); - } + // This version of AsStatusCallback is for cases when the callback can outlive the synchronizer. + // The callback holds a weak pointer to the synchronizer. + static StatusCallback AsStatusCallback(const std::shared_ptr& synchronizer); boost::function AsStatusFunctor() { return std::bind(&Synchronizer::StatusCB, this, std::placeholders::_1); @@ -104,28 +85,26 @@ class Synchronizer { return WaitUntil(std::chrono::steady_clock::now() + delta.ToSteadyDuration()); } - CHECKED_STATUS WaitUntil(const std::chrono::steady_clock::time_point& time) { - std::unique_lock lock(mutex_); - auto predicate = [this] { return assigned_; }; - if (time == std::chrono::steady_clock::time_point::max()) { - cond_.wait(lock, predicate); - } else if (!cond_.wait_until(lock, time, predicate)) { - return STATUS(TimedOut, "Timed out while waiting for the callback to be called."); - } - - return status_; - } + CHECKED_STATUS WaitUntil(const std::chrono::steady_clock::time_point& time); - void Reset() { - std::lock_guard lock(mutex_); - assigned_ = false; - status_ = Status::OK(); - } + void Reset(); private: + + // Invoked in the destructor and in Reset() to make sure Wait() was invoked if it had to be. + void EnsureWaitDone(); + std::mutex mutex_; std::condition_variable cond_; bool assigned_ = false; + + // If we've created a callback and given it out to an asynchronous operation, we must call Wait() + // on the synchronizer before destroying it. Not doing any locking around this variable because + // Wait() is supposed to be called on the same thread as AsStatusCallback(), or with adequate + // synchronization after that. Most frequently Wait() is called right after creating the + // synchronizer. + bool must_wait_ = false; + Status status_; };