diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h index 5116fd04..e3f9387b 100644 --- a/lib/ConsumerImplBase.h +++ b/lib/ConsumerImplBase.h @@ -40,11 +40,14 @@ class OpBatchReceive { const int64_t createAt_; }; -class ConsumerImplBase : public HandlerBase, public std::enable_shared_from_this { +class ConsumerImplBase : public HandlerBase { public: virtual ~ConsumerImplBase(){}; ConsumerImplBase(ClientImplPtr client, const std::string& topic, Backoff backoff, const ConsumerConfiguration& conf, ExecutorServicePtr listenerExecutor); + std::shared_ptr shared_from_this() noexcept { + return std::dynamic_pointer_cast(HandlerBase::shared_from_this()); + } // interface by consumer virtual Future getConsumerCreatedFuture() = 0; @@ -83,7 +86,6 @@ class ConsumerImplBase : public HandlerBase, public std::enable_shared_from_this // overrided methods from HandlerBase void connectionOpened(const ClientConnectionPtr& cnx) override {} void connectionFailed(Result result) override {} - HandlerBaseWeakPtr get_weak_from_this() override { return shared_from_this(); } // consumer impl generic method. ExecutorServicePtr listenerExecutor_; diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc index 7c2d1eef..163c7790 100644 --- a/lib/HandlerBase.cc +++ b/lib/HandlerBase.cc @@ -84,25 +84,18 @@ void HandlerBase::grabCnx() { connectionFailed(ResultConnectError); return; } - auto weakSelf = get_weak_from_this(); - client->getConnection(*topic_).addListener( - [this, weakSelf](Result result, const ClientConnectionPtr& cnx) { - auto self = weakSelf.lock(); - if (!self) { - LOG_DEBUG("HandlerBase Weak reference is not valid anymore"); - return; - } - - reconnectionPending_ = false; - - if (result == ResultOk) { - LOG_DEBUG(getName() << "Connected to broker: " << cnx->cnxString()); - connectionOpened(cnx); - } else { - connectionFailed(result); - scheduleReconnection(); - } - }); + auto self = shared_from_this(); + client->getConnection(*topic_).addListener([this, self](Result result, const ClientConnectionPtr& cnx) { + reconnectionPending_ = false; + + if (result == ResultOk) { + LOG_DEBUG(getName() << "Connected to broker: " << cnx->cnxString()); + connectionOpened(cnx); + } else { + connectionFailed(result); + scheduleReconnection(); + } + }); } void HandlerBase::handleDisconnection(Result result, const ClientConnectionPtr& cnx) { @@ -148,11 +141,14 @@ void HandlerBase::scheduleReconnection() { timer_->expires_from_now(delay); // passing shared_ptr here since time_ will get destroyed, so tasks will be cancelled // so we will not run into the case where grabCnx is invoked on out of scope handler - auto weakSelf = get_weak_from_this(); - timer_->async_wait([weakSelf](const boost::system::error_code& ec) { + auto name = getName(); + std::weak_ptr weakSelf{shared_from_this()}; + timer_->async_wait([name, weakSelf](const boost::system::error_code& ec) { auto self = weakSelf.lock(); if (self) { self->handleTimeout(ec); + } else { + LOG_WARN(name << "Cancel the reconnection since the handler is destroyed"); } }); } diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h index 2cd8dd1b..71eaafec 100644 --- a/lib/HandlerBase.h +++ b/lib/HandlerBase.h @@ -33,9 +33,6 @@ using namespace boost::posix_time; using boost::posix_time::milliseconds; using boost::posix_time::seconds; -class HandlerBase; -typedef std::weak_ptr HandlerBaseWeakPtr; -typedef std::shared_ptr HandlerBasePtr; class ClientImpl; using ClientImplPtr = std::shared_ptr; using ClientImplWeakPtr = std::weak_ptr; @@ -46,7 +43,7 @@ class ExecutorService; using ExecutorServicePtr = std::shared_ptr; using DeadlineTimerPtr = std::shared_ptr; -class HandlerBase { +class HandlerBase : public std::enable_shared_from_this { public: HandlerBase(const ClientImplPtr&, const std::string&, const Backoff&); @@ -84,8 +81,6 @@ class HandlerBase { virtual void connectionFailed(Result result) = 0; - virtual HandlerBaseWeakPtr get_weak_from_this() = 0; - virtual const std::string& getName() const = 0; private: diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index bdcc757d..6b3f5cb8 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -1010,7 +1010,5 @@ void ProducerImpl::asyncWaitSendTimeout(DurationType expiryTime) { }); } -ProducerImplWeakPtr ProducerImpl::weak_from_this() noexcept { return shared_from_this(); } - } // namespace pulsar /* namespace pulsar */ diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h index 8611bfef..aa4ba35a 100644 --- a/lib/ProducerImpl.h +++ b/lib/ProducerImpl.h @@ -60,9 +60,7 @@ namespace proto { class MessageMetadata; } // namespace proto -class ProducerImpl : public HandlerBase, - public std::enable_shared_from_this, - public ProducerImplBase { +class ProducerImpl : public HandlerBase, public ProducerImplBase { public: ProducerImpl(ClientImplPtr client, const TopicName& topic, const ProducerConfiguration& producerConfiguration, @@ -98,8 +96,11 @@ class ProducerImpl : public HandlerBase, static int getNumOfChunks(uint32_t size, uint32_t maxMessageSize); - // NOTE: this method is introduced into `enable_shared_from_this` since C++17 - ProducerImplWeakPtr weak_from_this() noexcept; + ProducerImplPtr shared_from_this() noexcept { + return std::dynamic_pointer_cast(HandlerBase::shared_from_this()); + } + + ProducerImplWeakPtr weak_from_this() noexcept { return shared_from_this(); } protected: ProducerStatsBasePtr producerStatsBasePtr_; @@ -121,7 +122,6 @@ class ProducerImpl : public HandlerBase, void beforeConnectionChange(ClientConnection& connection) override; void connectionOpened(const ClientConnectionPtr& connection) override; void connectionFailed(Result result) override; - HandlerBaseWeakPtr get_weak_from_this() override { return shared_from_this(); } const std::string& getName() const override { return producerStr_; } private: