Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the buggy Future and Promise implementations #299

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/BinaryProtoLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ void BinaryProtoLookupService::handlePartitionMetadataLookup(const std::string&
}

uint64_t BinaryProtoLookupService::newRequestId() {
Lock lock(mutex_);
std::lock_guard<std::mutex> lock(mutex_);
return ++requestIdGenerator_;
}

Expand Down
194 changes: 78 additions & 116 deletions lib/Future.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,162 +19,124 @@
#ifndef LIB_FUTURE_H_
#define LIB_FUTURE_H_

#include <condition_variable>
#include <atomic>
#include <functional>
#include <future>
#include <list>
#include <memory>
#include <mutex>

using Lock = std::unique_lock<std::mutex>;
#include <utility>

namespace pulsar {

template <typename Result, typename Type>
struct InternalState {
std::mutex mutex;
std::condition_variable condition;
Result result;
Type value;
bool complete;

std::list<typename std::function<void(Result, const Type&)> > listeners;
};

template <typename Result, typename Type>
class Future {
class InternalState {
public:
typedef std::function<void(Result, const Type&)> ListenerCallback;

Future& addListener(ListenerCallback callback) {
InternalState<Result, Type>* state = state_.get();
Lock lock(state->mutex);

if (state->complete) {
lock.unlock();
callback(state->result, state->value);
using Listener = std::function<void(Result, const Type &)>;
using Pair = std::pair<Result, Type>;

// NOTE: Add the constructor explicitly just to be compatible with GCC 4.8
InternalState() {}

void addListener(Listener listener) {
if (completed()) {
// Allow get_future() being called multiple times, only the 1st time will wait() be called to wait
// until all previous listeners are done.
try {
listenersPromise_.get_future().wait();
} catch (const std::future_error &e) {
if (e.code() != std::future_errc::future_already_retrieved) {
throw e;
}
}
listener(future_.get().first, future_.get().second);
RobertIndie marked this conversation as resolved.
Show resolved Hide resolved
} else {
state->listeners.push_back(callback);
std::lock_guard<std::mutex> lock{mutex_};
listeners_.emplace_back(listener);
RobertIndie marked this conversation as resolved.
Show resolved Hide resolved
}

return *this;
}

Result get(Type& result) {
InternalState<Result, Type>* state = state_.get();
Lock lock(state->mutex);

if (!state->complete) {
// Wait for result
while (!state->complete) {
state->condition.wait(lock);
}
bool complete(Result result, const Type &value) {
bool expected = false;
if (!completed_.compare_exchange_strong(expected, true)) {
return false;
}

result = state->value;
return state->result;
}
std::unique_lock<std::mutex> lock{mutex_};
decltype(listeners_) listeners;
listeners.swap(listeners_);
lock.unlock();

template <typename Duration>
bool get(Result& res, Type& value, Duration d) {
InternalState<Result, Type>* state = state_.get();
Lock lock(state->mutex);

if (!state->complete) {
// Wait for result
while (!state->complete) {
if (!state->condition.wait_for(lock, d, [&state] { return state->complete; })) {
// Timeout while waiting for the future to complete
return false;
}
}
for (auto &&listener : listeners) {
listener(result, value);
}
// Notify the previous listeners are all done so that any listener added after completing will be
// called after the previous listeners.
listenersPromise_.set_value(true);

value = state->value;
res = state->result;
promise_.set_value(std::make_pair(result, value));
return true;
}

private:
typedef std::shared_ptr<InternalState<Result, Type> > InternalStatePtr;
Future(InternalStatePtr state) : state_(state) {}
bool completed() const noexcept { return completed_; }

std::shared_ptr<InternalState<Result, Type> > state_;
Result get(Type &result) {
auto pair = future_.get();
result = std::move(pair.second);
return pair.first;
}

template <typename U, typename V>
friend class Promise;
private:
std::atomic_bool completed_{false};
std::promise<Pair> promise_;
std::shared_future<Pair> future_{promise_.get_future()};

std::promise<bool> listenersPromise_;
std::list<Listener> listeners_;
mutable std::mutex mutex_;
};

template <typename Result, typename Type>
class Promise {
public:
Promise() : state_(std::make_shared<InternalState<Result, Type> >()) {}

bool setValue(const Type& value) const {
static Result DEFAULT_RESULT;
InternalState<Result, Type>* state = state_.get();
Lock lock(state->mutex);
using InternalStatePtr = std::shared_ptr<InternalState<Result, Type>>;

if (state->complete) {
return false;
}

state->value = value;
state->result = DEFAULT_RESULT;
state->complete = true;

decltype(state->listeners) listeners;
listeners.swap(state->listeners);

lock.unlock();

for (auto& callback : listeners) {
callback(DEFAULT_RESULT, value);
}
template <typename Result, typename Type>
class Future {
public:
using Listener = typename InternalState<Result, Type>::Listener;

state->condition.notify_all();
return true;
Future &addListener(Listener listener) {
state_->addListener(listener);
return *this;
}

bool setFailed(Result result) const {
static Type DEFAULT_VALUE;
InternalState<Result, Type>* state = state_.get();
Lock lock(state->mutex);
Result get(Type &result) { return state_->get(result); }

if (state->complete) {
return false;
}
private:
InternalStatePtr<Result, Type> state_;

state->result = result;
state->complete = true;
Future(InternalStatePtr<Result, Type> state) : state_(state) {}

decltype(state->listeners) listeners;
listeners.swap(state->listeners);
template <typename U, typename V>
friend class Promise;
};

lock.unlock();
template <typename Result, typename Type>
class Promise {
public:
Promise() : state_(std::make_shared<InternalState<Result, Type>>()) {}

for (auto& callback : listeners) {
callback(result, DEFAULT_VALUE);
}
bool setValue(const Type &value) const { return state_->complete({}, value); }

state->condition.notify_all();
return true;
}
bool setFailed(Result result) const { return state_->complete(result, {}); }

bool isComplete() const {
InternalState<Result, Type>* state = state_.get();
Lock lock(state->mutex);
return state->complete;
}
bool isComplete() const { return state_->completed(); }

Future<Result, Type> getFuture() const { return Future<Result, Type>(state_); }
Future<Result, Type> getFuture() const { return Future<Result, Type>{state_}; }

private:
typedef std::function<void(Result, const Type&)> ListenerCallback;
std::shared_ptr<InternalState<Result, Type> > state_;
const InternalStatePtr<Result, Type> state_;
};

class Void {};

} /* namespace pulsar */
} // namespace pulsar

#endif /* LIB_FUTURE_H_ */
#endif
6 changes: 3 additions & 3 deletions lib/stats/ProducerStatsImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void ProducerStatsImpl::flushAndReset(const boost::system::error_code& ec) {
return;
}

Lock lock(mutex_);
std::unique_lock<std::mutex> lock(mutex_);
std::ostringstream oss;
oss << *this;
numMsgsSent_ = 0;
Expand All @@ -86,7 +86,7 @@ void ProducerStatsImpl::flushAndReset(const boost::system::error_code& ec) {
}

void ProducerStatsImpl::messageSent(const Message& msg) {
Lock lock(mutex_);
std::lock_guard<std::mutex> lock(mutex_);
numMsgsSent_++;
totalMsgsSent_++;
numBytesSent_ += msg.getLength();
Expand All @@ -96,7 +96,7 @@ void ProducerStatsImpl::messageSent(const Message& msg) {
void ProducerStatsImpl::messageReceived(Result res, const boost::posix_time::ptime& publishTime) {
boost::posix_time::ptime currentTime = boost::posix_time::microsec_clock::universal_time();
double diffInMicros = (currentTime - publishTime).total_microseconds();
Lock lock(mutex_);
std::lock_guard<std::mutex> lock(mutex_);
totalLatencyAccumulator_(diffInMicros);
latencyAccumulator_(diffInMicros);
sendMap_[res] += 1; // Value will automatically be initialized to 0 in the constructor
Expand Down
4 changes: 2 additions & 2 deletions tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ TEST(BasicEndToEndTest, testBatchMessages) {
}

void resendMessage(Result r, const MessageId msgId, Producer producer) {
Lock lock(mutex_);
std::unique_lock<std::mutex> lock(mutex_);
if (r != ResultOk) {
LOG_DEBUG("globalResendMessageCount" << globalResendMessageCount);
if (++globalResendMessageCount >= 3) {
Expand Down Expand Up @@ -993,7 +993,7 @@ TEST(BasicEndToEndTest, testResendViaSendCallback) {
// 3 seconds
std::this_thread::sleep_for(std::chrono::microseconds(3 * 1000 * 1000));
producer.close();
Lock lock(mutex_);
std::lock_guard<std::mutex> lock(mutex_);
ASSERT_GE(globalResendMessageCount, 3);
}

Expand Down