Skip to content

Commit

Permalink
Make stats timers thread safe to use
Browse files Browse the repository at this point in the history
### Motivation

The timers' callbacks in `ProducerStatsImpl` and `ConsumerStatsImpl` are
not thread safe because they both capture the `this` pointer, while when
the callback is called in the event loop, `this` might point to an
expired instance.

### Modifications

- Capture the weak pointer instead of `this` in these callbacks. Since
  we cannot call `shared_from_this()` in the constructor, a `start()`
  method is added.
- Remove the useless `executor_` field and unnecessary null check for
  `timer_`.
  • Loading branch information
BewareMyPower committed Mar 20, 2023
1 parent 998c73d commit 8242c4b
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 35 deletions.
1 change: 1 addition & 0 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
} else {
consumerStatsBasePtr_ = std::make_shared<ConsumerStatsDisabled>();
}
consumerStatsBasePtr_->start();

// Create msgCrypto
if (conf.isEncryptionEnabled()) {
Expand Down
1 change: 1 addition & 0 deletions lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
} else {
producerStatsBasePtr_ = std::make_shared<ProducerStatsDisabled>();
}
producerStatsBasePtr_->start();

if (conf_.isEncryptionEnabled()) {
std::ostringstream logCtxStream;
Expand Down
1 change: 1 addition & 0 deletions lib/stats/ConsumerStatsBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
namespace pulsar {
class ConsumerStatsBase {
public:
virtual void start() {}
virtual void receivedMessage(Message&, Result) = 0;
virtual void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums = 1) = 0;
virtual ~ConsumerStatsBase() {}
Expand Down
32 changes: 18 additions & 14 deletions lib/stats/ConsumerStatsImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,8 @@ using Lock = std::unique_lock<std::mutex>;
ConsumerStatsImpl::ConsumerStatsImpl(std::string consumerStr, ExecutorServicePtr executor,
unsigned int statsIntervalInSeconds)
: consumerStr_(consumerStr),
executor_(executor),
timer_(executor_->createDeadlineTimer()),
statsIntervalInSeconds_(statsIntervalInSeconds) {
timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_));
timer_->async_wait(std::bind(&pulsar::ConsumerStatsImpl::flushAndReset, this, std::placeholders::_1));
}
timer_(executor->createDeadlineTimer()),
statsIntervalInSeconds_(statsIntervalInSeconds) {}

ConsumerStatsImpl::ConsumerStatsImpl(const ConsumerStatsImpl& stats)
: consumerStr_(stats.consumerStr_),
Expand All @@ -63,17 +59,13 @@ void ConsumerStatsImpl::flushAndReset(const boost::system::error_code& ec) {
ackedMsgMap_.clear();
lock.unlock();

timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_));
timer_->async_wait(std::bind(&pulsar::ConsumerStatsImpl::flushAndReset, this, std::placeholders::_1));
scheduleTimer();
LOG_INFO(tmp);
}

ConsumerStatsImpl::~ConsumerStatsImpl() {
Lock lock(mutex_);
if (timer_) {
timer_->cancel();
}
}
ConsumerStatsImpl::~ConsumerStatsImpl() { timer_->cancel(); }

void ConsumerStatsImpl::start() { scheduleTimer(); }

void ConsumerStatsImpl::receivedMessage(Message& msg, Result res) {
Lock lock(mutex_);
Expand All @@ -91,6 +83,18 @@ void ConsumerStatsImpl::messageAcknowledged(Result res, CommandAck_AckType ackTy
totalAckedMsgMap_[std::make_pair(res, ackType)] += ackNums;
}

void ConsumerStatsImpl::scheduleTimer() {
timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_));
std::weak_ptr<ConsumerStatsImpl> weakSelf{shared_from_this()};
timer_->async_wait([this, weakSelf](const boost::system::error_code& ec) {
auto self = weakSelf.lock();
if (!self) {
return;
}
flushAndReset(ec);
});
}

std::ostream& operator<<(std::ostream& os,
const std::map<std::pair<Result, CommandAck_AckType>, unsigned long>& m) {
os << "{";
Expand Down
12 changes: 7 additions & 5 deletions lib/stats/ConsumerStatsImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <boost/asio/deadline_timer.hpp>
#include <map>
#include <memory>
#include <mutex>
#include <utility>

Expand All @@ -33,7 +34,7 @@ using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
class ExecutorService;
using ExecutorServicePtr = std::shared_ptr<ExecutorService>;

class ConsumerStatsImpl : public ConsumerStatsBase {
class ConsumerStatsImpl : public std::enable_shared_from_this<ConsumerStatsImpl>, public ConsumerStatsBase {
private:
std::string consumerStr_;

Expand All @@ -45,11 +46,11 @@ class ConsumerStatsImpl : public ConsumerStatsBase {
std::map<Result, unsigned long> totalReceivedMsgMap_;
std::map<std::pair<Result, CommandAck_AckType>, unsigned long> totalAckedMsgMap_;

ExecutorServicePtr executor_;
DeadlineTimerPtr timer_;
const DeadlineTimerPtr timer_;
std::mutex mutex_;
unsigned int statsIntervalInSeconds_;

void scheduleTimer();
friend std::ostream& operator<<(std::ostream&, const ConsumerStatsImpl&);
friend std::ostream& operator<<(std::ostream&, const std::map<Result, unsigned long>&);
friend class PulsarFriend;
Expand All @@ -58,8 +59,9 @@ class ConsumerStatsImpl : public ConsumerStatsBase {
ConsumerStatsImpl(std::string, ExecutorServicePtr, unsigned int);
ConsumerStatsImpl(const ConsumerStatsImpl& stats);
void flushAndReset(const boost::system::error_code&);
virtual void receivedMessage(Message&, Result);
virtual void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums = 1);
void start() override;
void receivedMessage(Message&, Result) override;
void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums) override;
virtual ~ConsumerStatsImpl();

const inline std::map<std::pair<Result, CommandAck_AckType>, unsigned long>& getAckedMsgMap() const {
Expand Down
1 change: 1 addition & 0 deletions lib/stats/ProducerStatsBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
namespace pulsar {
class ProducerStatsBase {
public:
virtual void start() {}
virtual void messageSent(const Message& msg) = 0;
virtual void messageReceived(Result, const boost::posix_time::ptime&) = 0;
virtual ~ProducerStatsBase(){};
Expand Down
28 changes: 16 additions & 12 deletions lib/stats/ProducerStatsImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,8 @@ ProducerStatsImpl::ProducerStatsImpl(std::string producerStr, ExecutorServicePtr
: producerStr_(producerStr),
latencyAccumulator_(boost::accumulators::tag::extended_p_square::probabilities = probs),
totalLatencyAccumulator_(boost::accumulators::tag::extended_p_square::probabilities = probs),
executor_(executor),
timer_(executor->createDeadlineTimer()),
statsIntervalInSeconds_(statsIntervalInSeconds) {
timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_));
timer_->async_wait(std::bind(&pulsar::ProducerStatsImpl::flushAndReset, this, std::placeholders::_1));
}
statsIntervalInSeconds_(statsIntervalInSeconds) {}

ProducerStatsImpl::ProducerStatsImpl(const ProducerStatsImpl& stats)
: producerStr_(stats.producerStr_),
Expand All @@ -67,6 +63,8 @@ ProducerStatsImpl::ProducerStatsImpl(const ProducerStatsImpl& stats)
totalLatencyAccumulator_(stats.totalLatencyAccumulator_),
statsIntervalInSeconds_(stats.statsIntervalInSeconds_) {}

void ProducerStatsImpl::start() { scheduleTimer(); }

void ProducerStatsImpl::flushAndReset(const boost::system::error_code& ec) {
if (ec) {
LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]");
Expand All @@ -82,8 +80,7 @@ void ProducerStatsImpl::flushAndReset(const boost::system::error_code& ec) {
LatencyAccumulator(boost::accumulators::tag::extended_p_square::probabilities = probs);
lock.unlock();

timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_));
timer_->async_wait(std::bind(&pulsar::ProducerStatsImpl::flushAndReset, this, std::placeholders::_1));
scheduleTimer();
LOG_INFO(tmp);
}

Expand All @@ -105,11 +102,18 @@ void ProducerStatsImpl::messageReceived(Result res, const boost::posix_time::pti
totalSendMap_[res] += 1; // Value will automatically be initialized to 0 in the constructor
}

ProducerStatsImpl::~ProducerStatsImpl() {
Lock lock(mutex_);
if (timer_) {
timer_->cancel();
}
ProducerStatsImpl::~ProducerStatsImpl() { timer_->cancel(); }

void ProducerStatsImpl::scheduleTimer() {
timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_));
std::weak_ptr<ProducerStatsImpl> weakSelf{shared_from_this()};
timer_->async_wait([this, weakSelf](const boost::system::error_code& ec) {
auto self = weakSelf.lock();
if (!self) {
return;
}
flushAndReset(ec);
});
}

std::ostream& operator<<(std::ostream& os, const ProducerStatsImpl& obj) {
Expand Down
11 changes: 7 additions & 4 deletions lib/stats/ProducerStatsImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ class ProducerStatsImpl : public std::enable_shared_from_this<ProducerStatsImpl>
std::map<Result, unsigned long> totalSendMap_;
LatencyAccumulator totalLatencyAccumulator_;

ExecutorServicePtr executor_;
DeadlineTimerPtr timer_;
const DeadlineTimerPtr timer_;
std::mutex mutex_;
unsigned int statsIntervalInSeconds_;

Expand All @@ -75,16 +74,20 @@ class ProducerStatsImpl : public std::enable_shared_from_this<ProducerStatsImpl>

static std::string latencyToString(const LatencyAccumulator&);

void scheduleTimer();

public:
ProducerStatsImpl(std::string, ExecutorServicePtr, unsigned int);

ProducerStatsImpl(const ProducerStatsImpl& stats);

void start() override;

void flushAndReset(const boost::system::error_code&);

void messageSent(const Message&);
void messageSent(const Message&) override;

void messageReceived(Result, const boost::posix_time::ptime&);
void messageReceived(Result, const boost::posix_time::ptime&) override;

~ProducerStatsImpl();

Expand Down

0 comments on commit 8242c4b

Please sign in to comment.