From 8242c4b603a6467241dc676c6c0310349d012240 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 20 Mar 2023 13:17:32 +0800 Subject: [PATCH] Make stats timers thread safe to use ### Motivation The timers' callbacks in `ProducerStatsImpl` and `ConsumerStatsImpl` are not thread safe because they both capture the `this` pointer, while when the callback is called in the event loop, `this` might point to an expired instance. ### Modifications - Capture the weak pointer instead of `this` in these callbacks. Since we cannot call `shared_from_this()` in the constructor, a `start()` method is added. - Remove the useless `executor_` field and unnecessary null check for `timer_`. --- lib/ConsumerImpl.cc | 1 + lib/ProducerImpl.cc | 1 + lib/stats/ConsumerStatsBase.h | 1 + lib/stats/ConsumerStatsImpl.cc | 32 ++++++++++++++++++-------------- lib/stats/ConsumerStatsImpl.h | 12 +++++++----- lib/stats/ProducerStatsBase.h | 1 + lib/stats/ProducerStatsImpl.cc | 28 ++++++++++++++++------------ lib/stats/ProducerStatsImpl.h | 11 +++++++---- 8 files changed, 52 insertions(+), 35 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 583cbcc0..3393faf5 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -117,6 +117,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, } else { consumerStatsBasePtr_ = std::make_shared(); } + consumerStatsBasePtr_->start(); // Create msgCrypto if (conf.isEncryptionEnabled()) { diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 2dbd6c8c..0b7f482d 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -98,6 +98,7 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName, } else { producerStatsBasePtr_ = std::make_shared(); } + producerStatsBasePtr_->start(); if (conf_.isEncryptionEnabled()) { std::ostringstream logCtxStream; diff --git a/lib/stats/ConsumerStatsBase.h b/lib/stats/ConsumerStatsBase.h index cc4a1595..6e2e71b8 100644 --- a/lib/stats/ConsumerStatsBase.h +++ b/lib/stats/ConsumerStatsBase.h @@ -27,6 +27,7 @@ namespace pulsar { class ConsumerStatsBase { public: + virtual void start() {} virtual void receivedMessage(Message&, Result) = 0; virtual void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums = 1) = 0; virtual ~ConsumerStatsBase() {} diff --git a/lib/stats/ConsumerStatsImpl.cc b/lib/stats/ConsumerStatsImpl.cc index 4f3b0fda..a2727c9b 100644 --- a/lib/stats/ConsumerStatsImpl.cc +++ b/lib/stats/ConsumerStatsImpl.cc @@ -33,12 +33,8 @@ using Lock = std::unique_lock; ConsumerStatsImpl::ConsumerStatsImpl(std::string consumerStr, ExecutorServicePtr executor, unsigned int statsIntervalInSeconds) : consumerStr_(consumerStr), - executor_(executor), - timer_(executor_->createDeadlineTimer()), - statsIntervalInSeconds_(statsIntervalInSeconds) { - timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_)); - timer_->async_wait(std::bind(&pulsar::ConsumerStatsImpl::flushAndReset, this, std::placeholders::_1)); -} + timer_(executor->createDeadlineTimer()), + statsIntervalInSeconds_(statsIntervalInSeconds) {} ConsumerStatsImpl::ConsumerStatsImpl(const ConsumerStatsImpl& stats) : consumerStr_(stats.consumerStr_), @@ -63,17 +59,13 @@ void ConsumerStatsImpl::flushAndReset(const boost::system::error_code& ec) { ackedMsgMap_.clear(); lock.unlock(); - timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_)); - timer_->async_wait(std::bind(&pulsar::ConsumerStatsImpl::flushAndReset, this, std::placeholders::_1)); + scheduleTimer(); LOG_INFO(tmp); } -ConsumerStatsImpl::~ConsumerStatsImpl() { - Lock lock(mutex_); - if (timer_) { - timer_->cancel(); - } -} +ConsumerStatsImpl::~ConsumerStatsImpl() { timer_->cancel(); } + +void ConsumerStatsImpl::start() { scheduleTimer(); } void ConsumerStatsImpl::receivedMessage(Message& msg, Result res) { Lock lock(mutex_); @@ -91,6 +83,18 @@ void ConsumerStatsImpl::messageAcknowledged(Result res, CommandAck_AckType ackTy totalAckedMsgMap_[std::make_pair(res, ackType)] += ackNums; } +void ConsumerStatsImpl::scheduleTimer() { + timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_)); + std::weak_ptr weakSelf{shared_from_this()}; + timer_->async_wait([this, weakSelf](const boost::system::error_code& ec) { + auto self = weakSelf.lock(); + if (!self) { + return; + } + flushAndReset(ec); + }); +} + std::ostream& operator<<(std::ostream& os, const std::map, unsigned long>& m) { os << "{"; diff --git a/lib/stats/ConsumerStatsImpl.h b/lib/stats/ConsumerStatsImpl.h index 1309ff6b..44f927f5 100644 --- a/lib/stats/ConsumerStatsImpl.h +++ b/lib/stats/ConsumerStatsImpl.h @@ -22,6 +22,7 @@ #include #include +#include #include #include @@ -33,7 +34,7 @@ using DeadlineTimerPtr = std::shared_ptr; class ExecutorService; using ExecutorServicePtr = std::shared_ptr; -class ConsumerStatsImpl : public ConsumerStatsBase { +class ConsumerStatsImpl : public std::enable_shared_from_this, public ConsumerStatsBase { private: std::string consumerStr_; @@ -45,11 +46,11 @@ class ConsumerStatsImpl : public ConsumerStatsBase { std::map totalReceivedMsgMap_; std::map, unsigned long> totalAckedMsgMap_; - ExecutorServicePtr executor_; - DeadlineTimerPtr timer_; + const DeadlineTimerPtr timer_; std::mutex mutex_; unsigned int statsIntervalInSeconds_; + void scheduleTimer(); friend std::ostream& operator<<(std::ostream&, const ConsumerStatsImpl&); friend std::ostream& operator<<(std::ostream&, const std::map&); friend class PulsarFriend; @@ -58,8 +59,9 @@ class ConsumerStatsImpl : public ConsumerStatsBase { ConsumerStatsImpl(std::string, ExecutorServicePtr, unsigned int); ConsumerStatsImpl(const ConsumerStatsImpl& stats); void flushAndReset(const boost::system::error_code&); - virtual void receivedMessage(Message&, Result); - virtual void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums = 1); + void start() override; + void receivedMessage(Message&, Result) override; + void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums) override; virtual ~ConsumerStatsImpl(); const inline std::map, unsigned long>& getAckedMsgMap() const { diff --git a/lib/stats/ProducerStatsBase.h b/lib/stats/ProducerStatsBase.h index aafc8774..fe0ba0a5 100644 --- a/lib/stats/ProducerStatsBase.h +++ b/lib/stats/ProducerStatsBase.h @@ -27,6 +27,7 @@ namespace pulsar { class ProducerStatsBase { public: + virtual void start() {} virtual void messageSent(const Message& msg) = 0; virtual void messageReceived(Result, const boost::posix_time::ptime&) = 0; virtual ~ProducerStatsBase(){}; diff --git a/lib/stats/ProducerStatsImpl.cc b/lib/stats/ProducerStatsImpl.cc index f30aebb4..ad7bec4e 100644 --- a/lib/stats/ProducerStatsImpl.cc +++ b/lib/stats/ProducerStatsImpl.cc @@ -48,12 +48,8 @@ ProducerStatsImpl::ProducerStatsImpl(std::string producerStr, ExecutorServicePtr : producerStr_(producerStr), latencyAccumulator_(boost::accumulators::tag::extended_p_square::probabilities = probs), totalLatencyAccumulator_(boost::accumulators::tag::extended_p_square::probabilities = probs), - executor_(executor), timer_(executor->createDeadlineTimer()), - statsIntervalInSeconds_(statsIntervalInSeconds) { - timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_)); - timer_->async_wait(std::bind(&pulsar::ProducerStatsImpl::flushAndReset, this, std::placeholders::_1)); -} + statsIntervalInSeconds_(statsIntervalInSeconds) {} ProducerStatsImpl::ProducerStatsImpl(const ProducerStatsImpl& stats) : producerStr_(stats.producerStr_), @@ -67,6 +63,8 @@ ProducerStatsImpl::ProducerStatsImpl(const ProducerStatsImpl& stats) totalLatencyAccumulator_(stats.totalLatencyAccumulator_), statsIntervalInSeconds_(stats.statsIntervalInSeconds_) {} +void ProducerStatsImpl::start() { scheduleTimer(); } + void ProducerStatsImpl::flushAndReset(const boost::system::error_code& ec) { if (ec) { LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]"); @@ -82,8 +80,7 @@ void ProducerStatsImpl::flushAndReset(const boost::system::error_code& ec) { LatencyAccumulator(boost::accumulators::tag::extended_p_square::probabilities = probs); lock.unlock(); - timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_)); - timer_->async_wait(std::bind(&pulsar::ProducerStatsImpl::flushAndReset, this, std::placeholders::_1)); + scheduleTimer(); LOG_INFO(tmp); } @@ -105,11 +102,18 @@ void ProducerStatsImpl::messageReceived(Result res, const boost::posix_time::pti totalSendMap_[res] += 1; // Value will automatically be initialized to 0 in the constructor } -ProducerStatsImpl::~ProducerStatsImpl() { - Lock lock(mutex_); - if (timer_) { - timer_->cancel(); - } +ProducerStatsImpl::~ProducerStatsImpl() { timer_->cancel(); } + +void ProducerStatsImpl::scheduleTimer() { + timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_)); + std::weak_ptr weakSelf{shared_from_this()}; + timer_->async_wait([this, weakSelf](const boost::system::error_code& ec) { + auto self = weakSelf.lock(); + if (!self) { + return; + } + flushAndReset(ec); + }); } std::ostream& operator<<(std::ostream& os, const ProducerStatsImpl& obj) { diff --git a/lib/stats/ProducerStatsImpl.h b/lib/stats/ProducerStatsImpl.h index a826ef00..8cd10992 100644 --- a/lib/stats/ProducerStatsImpl.h +++ b/lib/stats/ProducerStatsImpl.h @@ -64,8 +64,7 @@ class ProducerStatsImpl : public std::enable_shared_from_this std::map totalSendMap_; LatencyAccumulator totalLatencyAccumulator_; - ExecutorServicePtr executor_; - DeadlineTimerPtr timer_; + const DeadlineTimerPtr timer_; std::mutex mutex_; unsigned int statsIntervalInSeconds_; @@ -75,16 +74,20 @@ class ProducerStatsImpl : public std::enable_shared_from_this static std::string latencyToString(const LatencyAccumulator&); + void scheduleTimer(); + public: ProducerStatsImpl(std::string, ExecutorServicePtr, unsigned int); ProducerStatsImpl(const ProducerStatsImpl& stats); + void start() override; + void flushAndReset(const boost::system::error_code&); - void messageSent(const Message&); + void messageSent(const Message&) override; - void messageReceived(Result, const boost::posix_time::ptime&); + void messageReceived(Result, const boost::posix_time::ptime&) override; ~ProducerStatsImpl();