Skip to content

Commit

Permalink
Avoid calling cancel() in destructors
Browse files Browse the repository at this point in the history
  • Loading branch information
BewareMyPower committed Mar 20, 2023
1 parent 8242c4b commit d5b000a
Show file tree
Hide file tree
Showing 8 changed files with 18 additions and 2 deletions.
1 change: 1 addition & 0 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1251,6 +1251,7 @@ void ConsumerImpl::shutdown() {
client->cleanupConsumer(this);
}
negativeAcksTracker_.close();
consumerStatsBasePtr_->close();
cancelTimers();
consumerCreatedPromise_.setFailed(ResultAlreadyClosed);
failPendingReceiveCallback();
Expand Down
1 change: 1 addition & 0 deletions lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,7 @@ void ProducerImpl::shutdown() {
if (client) {
client->cleanupProducer(this);
}
producerStatsBasePtr_->close();
cancelTimers();
producerCreatedPromise_.setFailed(ResultAlreadyClosed);
state_ = Closed;
Expand Down
1 change: 1 addition & 0 deletions lib/stats/ConsumerStatsBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace pulsar {
class ConsumerStatsBase {
public:
virtual void start() {}
virtual void close() {}
virtual void receivedMessage(Message&, Result) = 0;
virtual void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums = 1) = 0;
virtual ~ConsumerStatsBase() {}
Expand Down
7 changes: 6 additions & 1 deletion lib/stats/ConsumerStatsImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,15 @@ void ConsumerStatsImpl::flushAndReset(const boost::system::error_code& ec) {
LOG_INFO(tmp);
}

ConsumerStatsImpl::~ConsumerStatsImpl() { timer_->cancel(); }
ConsumerStatsImpl::~ConsumerStatsImpl() {}

void ConsumerStatsImpl::start() { scheduleTimer(); }

void ConsumerStatsImpl::close() {
boost::system::error_code ec;
timer_->cancel(ec);
}

void ConsumerStatsImpl::receivedMessage(Message& msg, Result res) {
Lock lock(mutex_);
if (res == ResultOk) {
Expand Down
1 change: 1 addition & 0 deletions lib/stats/ConsumerStatsImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class ConsumerStatsImpl : public std::enable_shared_from_this<ConsumerStatsImpl>
ConsumerStatsImpl(const ConsumerStatsImpl& stats);
void flushAndReset(const boost::system::error_code&);
void start() override;
void close() override;
void receivedMessage(Message&, Result) override;
void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums) override;
virtual ~ConsumerStatsImpl();
Expand Down
1 change: 1 addition & 0 deletions lib/stats/ProducerStatsBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace pulsar {
class ProducerStatsBase {
public:
virtual void start() {}
virtual void close() {}
virtual void messageSent(const Message& msg) = 0;
virtual void messageReceived(Result, const boost::posix_time::ptime&) = 0;
virtual ~ProducerStatsBase(){};
Expand Down
7 changes: 6 additions & 1 deletion lib/stats/ProducerStatsImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,12 @@ void ProducerStatsImpl::messageReceived(Result res, const boost::posix_time::pti
totalSendMap_[res] += 1; // Value will automatically be initialized to 0 in the constructor
}

ProducerStatsImpl::~ProducerStatsImpl() { timer_->cancel(); }
ProducerStatsImpl::~ProducerStatsImpl() {}

void ProducerStatsImpl::close() {
boost::system::error_code ec;
timer_->cancel(ec);
}

void ProducerStatsImpl::scheduleTimer() {
timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_));
Expand Down
1 change: 1 addition & 0 deletions lib/stats/ProducerStatsImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class ProducerStatsImpl : public std::enable_shared_from_this<ProducerStatsImpl>
ProducerStatsImpl(const ProducerStatsImpl& stats);

void start() override;
void close() override;

void flushAndReset(const boost::system::error_code&);

Expand Down

0 comments on commit d5b000a

Please sign in to comment.