Skip to content

Commit

Permalink
#239 Callback should keep a reference to Synchronizer in UpdateReplica
Browse files Browse the repository at this point in the history
Summary:
In UpdateReplica we use a synchronizer, get a callback from it, and pass it to the log append logic.
In some cases UpdateReplica can return earlier, and the callback will try to reference invalid
memory. To fix this, we manage the synchronizer using a shared pointer in that case. Also we add
a check that if the original version of AsStatusCallback is called, Wait() must have been called by
the time the synchronizer is destroyed.

Test Plan: Jenkins

Reviewers: sergei, kannan, amitanand

Reviewed By: amitanand

Subscribers: ybase, bharat

Differential Revision: https://phabricator.dev.yugabyte.com/D4699
  • Loading branch information
mbautin committed Apr 28, 2018
1 parent 10338ff commit 0d07add
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 48 deletions.
6 changes: 3 additions & 3 deletions src/yb/consensus/raft_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1364,8 +1364,8 @@ Status RaftConsensus::UpdateReplica(ConsensusRequestPB* request,
clock_->Update(HybridTime(request->propagated_hybrid_time()));
}

Synchronizer log_synchronizer;
StatusCallback sync_status_cb = log_synchronizer.AsStatusCallback();
auto log_synchronizer = std::make_shared<Synchronizer>();
StatusCallback sync_status_cb = Synchronizer::AsStatusCallback(log_synchronizer);

// The ordering of the following operations is crucial, read on for details.
//
Expand Down Expand Up @@ -1516,7 +1516,7 @@ Status RaftConsensus::UpdateReplica(ConsensusRequestPB* request,
// Release the lock while we wait for the log append to finish so that commits can go through.
// We'll re-acquire it before we update the state again.

RETURN_NOT_OK(WaitWritesUnlocked(deduped_req, &log_synchronizer));
RETURN_NOT_OK(WaitWritesUnlocked(deduped_req, log_synchronizer.get()));

if (PREDICT_FALSE(VLOG_IS_ON(2))) {
VLOG_WITH_PREFIX(2) << "Replica updated."
Expand Down
1 change: 1 addition & 0 deletions src/yb/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ set(UTIL_SRCS
uuid.cc
varint.cc
version_info.cc
async_util.cc
${UTIL_SRCS_EXTENSIONS}
)

Expand Down
109 changes: 109 additions & 0 deletions src/yb/util/async_util.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.

#include "yb/util/async_util.h"

#include "yb/util/logging.h"

namespace yb {

namespace {

void CallStatusCBMaybe(std::weak_ptr<Synchronizer> weak_sync, const Status& status) {
auto sync = weak_sync.lock();
if (sync) {
sync->StatusCB(status);
}
}

} // anonymous namespace

Synchronizer::~Synchronizer() {
EnsureWaitDone();
}

void Synchronizer::StatusCB(const Status& status) {
std::lock_guard<std::mutex> lock(mutex_);
if (!assigned_) {
assigned_ = true;
status_ = status;
cond_.notify_all();
} else {
LOG(DFATAL) << "Status already assigned, existing: " << status_ << ", new: " << status;
}
}

StatusCallback Synchronizer::AsStatusCallback() {
DCHECK(!assigned_);

// Cannot destroy the synchronizer without calling Wait().
must_wait_ = true;
return Bind(&Synchronizer::StatusCB, Unretained(this));
}

StdStatusCallback Synchronizer::AsStdStatusCallback() {
DCHECK(!assigned_);

// Cannot destroy the synchronizer without calling Wait().
must_wait_ = true;
return std::bind(&Synchronizer::StatusCB, this, std::placeholders::_1);
}

StatusCallback Synchronizer::AsStatusCallback(const std::shared_ptr<Synchronizer>& synchronizer) {
DCHECK(!synchronizer->assigned_);
// No need to set must_wait_ here -- the callback knows whether Synchronizer still exists.
std::weak_ptr<Synchronizer> weak_sync(synchronizer);
return Bind(CallStatusCBMaybe, weak_sync);
}

Status Synchronizer::WaitUntil(const std::chrono::steady_clock::time_point& time) {
std::unique_lock<std::mutex> lock(mutex_);
auto predicate = [this] { return assigned_; };
if (time == std::chrono::steady_clock::time_point::max()) {
cond_.wait(lock, predicate);
} else if (!cond_.wait_until(lock, time, predicate)) {
return STATUS(TimedOut, "Timed out while waiting for the callback to be called.");
}

// The callback that keep a pointer to this potentially stack-allocated synchronizer has been
// called, assuming there was only one such callback. OK for the synchronizer to go out of
// scope.
must_wait_ = false;

return status_;
}

void Synchronizer::Reset() {
std::lock_guard<std::mutex> lock(mutex_);
EnsureWaitDone();
assigned_ = false;
status_ = Status::OK();
must_wait_ = false;
}

void Synchronizer::EnsureWaitDone() {
if (must_wait_) {
static const char* kErrorMsg =
"Synchronizer went out of scope, Wait() has returned success, callbacks may "
"access invalid memory!";

#ifndef NDEBUG
LOG(FATAL) << kErrorMsg;
#else
const int kWaitSec = 10;
YB_LOG_EVERY_N_SECS(ERROR, 1) << kErrorMsg << " Waiting up to " << kWaitSec << " seconds";
CHECK_OK(WaitFor(MonoDelta::FromSeconds(kWaitSec)));
#endif
}
}

} // namespace yb
69 changes: 24 additions & 45 deletions src/yb/util/async_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,39 +58,20 @@ class Synchronizer {
void operator=(const Synchronizer&) = delete;

Synchronizer() {}
~Synchronizer();

void StatusCB(const Status& status) {
std::lock_guard<std::mutex> lock(mutex_);
if (!assigned_) {
assigned_ = true;
status_ = status;
cond_.notify_all();
} else {
LOG(DFATAL) << "Status already assigned, existing: " << status_ << ", new: " << status;
}
}

StatusCallback AsStatusCallback() {
DCHECK(!assigned_);
void StatusCB(const Status& status);

// Synchronizers are often declared on the stack, so it doesn't make
// sense for a callback to take a reference to its synchronizer.
//
// Note: this means the returned callback _must_ go out of scope before
// its synchronizer.
return Bind(&Synchronizer::StatusCB, Unretained(this));
}
// Use this for synchronizers declared on the stack. The callback does not take a reference to
// its synchronizer, so the returned callback _must_ go out of scope before its synchronizer.
StatusCallback AsStatusCallback();

StdStatusCallback AsStdStatusCallback() {
DCHECK(!assigned_);
// Same semantics as AsStatusCallback.
StdStatusCallback AsStdStatusCallback();

// Synchronizers are often declared on the stack, so it doesn't make
// sense for a callback to take a reference to its synchronizer.
//
// Note: this means the returned callback _must_ go out of scope before
// its synchronizer.
return std::bind(&Synchronizer::StatusCB, this, std::placeholders::_1);
}
// This version of AsStatusCallback is for cases when the callback can outlive the synchronizer.
// The callback holds a weak pointer to the synchronizer.
static StatusCallback AsStatusCallback(const std::shared_ptr<Synchronizer>& synchronizer);

boost::function<void(const Status&)> AsStatusFunctor() {
return std::bind(&Synchronizer::StatusCB, this, std::placeholders::_1);
Expand All @@ -104,28 +85,26 @@ class Synchronizer {
return WaitUntil(std::chrono::steady_clock::now() + delta.ToSteadyDuration());
}

CHECKED_STATUS WaitUntil(const std::chrono::steady_clock::time_point& time) {
std::unique_lock<std::mutex> lock(mutex_);
auto predicate = [this] { return assigned_; };
if (time == std::chrono::steady_clock::time_point::max()) {
cond_.wait(lock, predicate);
} else if (!cond_.wait_until(lock, time, predicate)) {
return STATUS(TimedOut, "Timed out while waiting for the callback to be called.");
}

return status_;
}
CHECKED_STATUS WaitUntil(const std::chrono::steady_clock::time_point& time);

void Reset() {
std::lock_guard<std::mutex> lock(mutex_);
assigned_ = false;
status_ = Status::OK();
}
void Reset();

private:

// Invoked in the destructor and in Reset() to make sure Wait() was invoked if it had to be.
void EnsureWaitDone();

std::mutex mutex_;
std::condition_variable cond_;
bool assigned_ = false;

// If we've created a callback and given it out to an asynchronous operation, we must call Wait()
// on the synchronizer before destroying it. Not doing any locking around this variable because
// Wait() is supposed to be called on the same thread as AsStatusCallback(), or with adequate
// synchronization after that. Most frequently Wait() is called right after creating the
// synchronizer.
bool must_wait_ = false;

Status status_;
};

Expand Down

0 comments on commit 0d07add

Please sign in to comment.