Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make stats timers thread safe to use #223

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
} else {
consumerStatsBasePtr_ = std::make_shared<ConsumerStatsDisabled>();
}
consumerStatsBasePtr_->start();

// Create msgCrypto
if (conf.isEncryptionEnabled()) {
Expand Down
1 change: 1 addition & 0 deletions lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
} else {
producerStatsBasePtr_ = std::make_shared<ProducerStatsDisabled>();
}
producerStatsBasePtr_->start();

if (conf_.isEncryptionEnabled()) {
std::ostringstream logCtxStream;
Expand Down
1 change: 1 addition & 0 deletions lib/stats/ConsumerStatsBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
namespace pulsar {
class ConsumerStatsBase {
public:
virtual void start() {}
virtual void receivedMessage(Message&, Result) = 0;
virtual void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums = 1) = 0;
virtual ~ConsumerStatsBase() {}
Expand Down
37 changes: 21 additions & 16 deletions lib/stats/ConsumerStatsImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,8 @@ using Lock = std::unique_lock<std::mutex>;
ConsumerStatsImpl::ConsumerStatsImpl(std::string consumerStr, ExecutorServicePtr executor,
unsigned int statsIntervalInSeconds)
: consumerStr_(consumerStr),
executor_(executor),
timer_(executor_->createDeadlineTimer()),
statsIntervalInSeconds_(statsIntervalInSeconds) {
timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_));
timer_->async_wait(std::bind(&pulsar::ConsumerStatsImpl::flushAndReset, this, std::placeholders::_1));
}
timer_(executor->createDeadlineTimer()),
statsIntervalInSeconds_(statsIntervalInSeconds) {}

ConsumerStatsImpl::ConsumerStatsImpl(const ConsumerStatsImpl& stats)
: consumerStr_(stats.consumerStr_),
Expand All @@ -57,23 +53,20 @@ void ConsumerStatsImpl::flushAndReset(const boost::system::error_code& ec) {
}

Lock lock(mutex_);
ConsumerStatsImpl tmp = *this;
std::ostringstream oss;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be removed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? We need to log the current instance. Before this PR, it was logged by copying the current instance, which is heavy and might cause unexpected destruction of the shared pointer.

oss << *this;
numBytesRecieved_ = 0;
receivedMsgMap_.clear();
ackedMsgMap_.clear();
lock.unlock();

timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_));
timer_->async_wait(std::bind(&pulsar::ConsumerStatsImpl::flushAndReset, this, std::placeholders::_1));
LOG_INFO(tmp);
scheduleTimer();
LOG_INFO(oss.str());
}

ConsumerStatsImpl::~ConsumerStatsImpl() {
Lock lock(mutex_);
if (timer_) {
timer_->cancel();
}
}
ConsumerStatsImpl::~ConsumerStatsImpl() { timer_->cancel(); }

void ConsumerStatsImpl::start() { scheduleTimer(); }

void ConsumerStatsImpl::receivedMessage(Message& msg, Result res) {
Lock lock(mutex_);
Expand All @@ -91,6 +84,18 @@ void ConsumerStatsImpl::messageAcknowledged(Result res, CommandAck_AckType ackTy
totalAckedMsgMap_[std::make_pair(res, ackType)] += ackNums;
}

void ConsumerStatsImpl::scheduleTimer() {
timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_));
std::weak_ptr<ConsumerStatsImpl> weakSelf{shared_from_this()};
timer_->async_wait([this, weakSelf](const boost::system::error_code& ec) {
auto self = weakSelf.lock();
if (!self) {
return;
}
flushAndReset(ec);
});
}

std::ostream& operator<<(std::ostream& os,
const std::map<std::pair<Result, CommandAck_AckType>, unsigned long>& m) {
os << "{";
Expand Down
12 changes: 7 additions & 5 deletions lib/stats/ConsumerStatsImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <boost/asio/deadline_timer.hpp>
#include <map>
#include <memory>
#include <mutex>
#include <utility>

Expand All @@ -33,7 +34,7 @@ using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
class ExecutorService;
using ExecutorServicePtr = std::shared_ptr<ExecutorService>;

class ConsumerStatsImpl : public ConsumerStatsBase {
class ConsumerStatsImpl : public std::enable_shared_from_this<ConsumerStatsImpl>, public ConsumerStatsBase {
private:
std::string consumerStr_;

Expand All @@ -45,11 +46,11 @@ class ConsumerStatsImpl : public ConsumerStatsBase {
std::map<Result, unsigned long> totalReceivedMsgMap_;
std::map<std::pair<Result, CommandAck_AckType>, unsigned long> totalAckedMsgMap_;

ExecutorServicePtr executor_;
DeadlineTimerPtr timer_;
const DeadlineTimerPtr timer_;
std::mutex mutex_;
unsigned int statsIntervalInSeconds_;

void scheduleTimer();
friend std::ostream& operator<<(std::ostream&, const ConsumerStatsImpl&);
friend std::ostream& operator<<(std::ostream&, const std::map<Result, unsigned long>&);
friend class PulsarFriend;
Expand All @@ -58,8 +59,9 @@ class ConsumerStatsImpl : public ConsumerStatsBase {
ConsumerStatsImpl(std::string, ExecutorServicePtr, unsigned int);
ConsumerStatsImpl(const ConsumerStatsImpl& stats);
void flushAndReset(const boost::system::error_code&);
virtual void receivedMessage(Message&, Result);
virtual void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums = 1);
void start() override;
void receivedMessage(Message&, Result) override;
void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums) override;
virtual ~ConsumerStatsImpl();

const inline std::map<std::pair<Result, CommandAck_AckType>, unsigned long>& getAckedMsgMap() const {
Expand Down
1 change: 1 addition & 0 deletions lib/stats/ProducerStatsBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
namespace pulsar {
class ProducerStatsBase {
public:
virtual void start() {}
virtual void messageSent(const Message& msg) = 0;
virtual void messageReceived(Result, const boost::posix_time::ptime&) = 0;
virtual ~ProducerStatsBase(){};
Expand Down
33 changes: 19 additions & 14 deletions lib/stats/ProducerStatsImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,8 @@ ProducerStatsImpl::ProducerStatsImpl(std::string producerStr, ExecutorServicePtr
: producerStr_(producerStr),
latencyAccumulator_(boost::accumulators::tag::extended_p_square::probabilities = probs),
totalLatencyAccumulator_(boost::accumulators::tag::extended_p_square::probabilities = probs),
executor_(executor),
timer_(executor->createDeadlineTimer()),
statsIntervalInSeconds_(statsIntervalInSeconds) {
timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_));
timer_->async_wait(std::bind(&pulsar::ProducerStatsImpl::flushAndReset, this, std::placeholders::_1));
}
statsIntervalInSeconds_(statsIntervalInSeconds) {}

ProducerStatsImpl::ProducerStatsImpl(const ProducerStatsImpl& stats)
: producerStr_(stats.producerStr_),
Expand All @@ -67,24 +63,26 @@ ProducerStatsImpl::ProducerStatsImpl(const ProducerStatsImpl& stats)
totalLatencyAccumulator_(stats.totalLatencyAccumulator_),
statsIntervalInSeconds_(stats.statsIntervalInSeconds_) {}

void ProducerStatsImpl::start() { scheduleTimer(); }

void ProducerStatsImpl::flushAndReset(const boost::system::error_code& ec) {
if (ec) {
LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]");
return;
}

Lock lock(mutex_);
ProducerStatsImpl tmp = *this;
std::ostringstream oss;
oss << *this;
numMsgsSent_ = 0;
numBytesSent_ = 0;
sendMap_.clear();
latencyAccumulator_ =
LatencyAccumulator(boost::accumulators::tag::extended_p_square::probabilities = probs);
lock.unlock();

timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_));
timer_->async_wait(std::bind(&pulsar::ProducerStatsImpl::flushAndReset, this, std::placeholders::_1));
LOG_INFO(tmp);
scheduleTimer();
LOG_INFO(oss.str());
}

void ProducerStatsImpl::messageSent(const Message& msg) {
Expand All @@ -105,11 +103,18 @@ void ProducerStatsImpl::messageReceived(Result res, const boost::posix_time::pti
totalSendMap_[res] += 1; // Value will automatically be initialized to 0 in the constructor
}

ProducerStatsImpl::~ProducerStatsImpl() {
Lock lock(mutex_);
if (timer_) {
timer_->cancel();
}
ProducerStatsImpl::~ProducerStatsImpl() { timer_->cancel(); }

void ProducerStatsImpl::scheduleTimer() {
timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_));
std::weak_ptr<ProducerStatsImpl> weakSelf{shared_from_this()};
timer_->async_wait([this, weakSelf](const boost::system::error_code& ec) {
auto self = weakSelf.lock();
if (!self) {
return;
}
flushAndReset(ec);
});
}

std::ostream& operator<<(std::ostream& os, const ProducerStatsImpl& obj) {
Expand Down
11 changes: 7 additions & 4 deletions lib/stats/ProducerStatsImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ class ProducerStatsImpl : public std::enable_shared_from_this<ProducerStatsImpl>
std::map<Result, unsigned long> totalSendMap_;
LatencyAccumulator totalLatencyAccumulator_;

ExecutorServicePtr executor_;
DeadlineTimerPtr timer_;
const DeadlineTimerPtr timer_;
std::mutex mutex_;
unsigned int statsIntervalInSeconds_;

Expand All @@ -75,16 +74,20 @@ class ProducerStatsImpl : public std::enable_shared_from_this<ProducerStatsImpl>

static std::string latencyToString(const LatencyAccumulator&);

void scheduleTimer();

public:
ProducerStatsImpl(std::string, ExecutorServicePtr, unsigned int);

ProducerStatsImpl(const ProducerStatsImpl& stats);

void start() override;

void flushAndReset(const boost::system::error_code&);

void messageSent(const Message&);
void messageSent(const Message&) override;

void messageReceived(Result, const boost::posix_time::ptime&);
void messageReceived(Result, const boost::posix_time::ptime&) override;

~ProducerStatsImpl();

Expand Down