Skip to content

Commit

Permalink
Fix the handler instance is expired when the connection is established (
Browse files Browse the repository at this point in the history
#323)

### Motivation

We observed some logs that showed the handler instance is expired when
the connection is established after a reconnection:

```
HandlerBase Weak reference is not valid anymore
```

https://github.com/apache/pulsar-client-cpp/blob/b35ae1aa4b9834886c0889635de81834f9b2f774/lib/HandlerBase.cc#L92

### Modifications

Pass a `shared_ptr` instead of a `weak_ptr` in `HandlerBase::grabCnx` to
ensure the `connectionOpened` or `connectionFailed` callback is called
if `ClientImpl::getConnection` is called. We only need to pass a
`weak_ptr` in `scheduleReconnection` to skip the reconnection if the
handler is expired.
  • Loading branch information
BewareMyPower authored Oct 4, 2023
1 parent b35ae1a commit af45a54
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 37 deletions.
6 changes: 4 additions & 2 deletions lib/ConsumerImplBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@ class OpBatchReceive {
const int64_t createAt_;
};

class ConsumerImplBase : public HandlerBase, public std::enable_shared_from_this<ConsumerImplBase> {
class ConsumerImplBase : public HandlerBase {
public:
virtual ~ConsumerImplBase(){};
ConsumerImplBase(ClientImplPtr client, const std::string& topic, Backoff backoff,
const ConsumerConfiguration& conf, ExecutorServicePtr listenerExecutor);
std::shared_ptr<ConsumerImplBase> shared_from_this() noexcept {
return std::dynamic_pointer_cast<ConsumerImplBase>(HandlerBase::shared_from_this());
}

// interface by consumer
virtual Future<Result, ConsumerImplBaseWeakPtr> getConsumerCreatedFuture() = 0;
Expand Down Expand Up @@ -83,7 +86,6 @@ class ConsumerImplBase : public HandlerBase, public std::enable_shared_from_this
// overrided methods from HandlerBase
void connectionOpened(const ClientConnectionPtr& cnx) override {}
void connectionFailed(Result result) override {}
HandlerBaseWeakPtr get_weak_from_this() override { return shared_from_this(); }

// consumer impl generic method.
ExecutorServicePtr listenerExecutor_;
Expand Down
38 changes: 17 additions & 21 deletions lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,25 +84,18 @@ void HandlerBase::grabCnx() {
connectionFailed(ResultConnectError);
return;
}
auto weakSelf = get_weak_from_this();
client->getConnection(*topic_).addListener(
[this, weakSelf](Result result, const ClientConnectionPtr& cnx) {
auto self = weakSelf.lock();
if (!self) {
LOG_DEBUG("HandlerBase Weak reference is not valid anymore");
return;
}

reconnectionPending_ = false;

if (result == ResultOk) {
LOG_DEBUG(getName() << "Connected to broker: " << cnx->cnxString());
connectionOpened(cnx);
} else {
connectionFailed(result);
scheduleReconnection();
}
});
auto self = shared_from_this();
client->getConnection(*topic_).addListener([this, self](Result result, const ClientConnectionPtr& cnx) {
reconnectionPending_ = false;

if (result == ResultOk) {
LOG_DEBUG(getName() << "Connected to broker: " << cnx->cnxString());
connectionOpened(cnx);
} else {
connectionFailed(result);
scheduleReconnection();
}
});
}

void HandlerBase::handleDisconnection(Result result, const ClientConnectionPtr& cnx) {
Expand Down Expand Up @@ -148,11 +141,14 @@ void HandlerBase::scheduleReconnection() {
timer_->expires_from_now(delay);
// passing shared_ptr here since time_ will get destroyed, so tasks will be cancelled
// so we will not run into the case where grabCnx is invoked on out of scope handler
auto weakSelf = get_weak_from_this();
timer_->async_wait([weakSelf](const boost::system::error_code& ec) {
auto name = getName();
std::weak_ptr<HandlerBase> weakSelf{shared_from_this()};
timer_->async_wait([name, weakSelf](const boost::system::error_code& ec) {
auto self = weakSelf.lock();
if (self) {
self->handleTimeout(ec);
} else {
LOG_WARN(name << "Cancel the reconnection since the handler is destroyed");
}
});
}
Expand Down
7 changes: 1 addition & 6 deletions lib/HandlerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ using namespace boost::posix_time;
using boost::posix_time::milliseconds;
using boost::posix_time::seconds;

class HandlerBase;
typedef std::weak_ptr<HandlerBase> HandlerBaseWeakPtr;
typedef std::shared_ptr<HandlerBase> HandlerBasePtr;
class ClientImpl;
using ClientImplPtr = std::shared_ptr<ClientImpl>;
using ClientImplWeakPtr = std::weak_ptr<ClientImpl>;
Expand All @@ -46,7 +43,7 @@ class ExecutorService;
using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;

class HandlerBase {
class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
public:
HandlerBase(const ClientImplPtr&, const std::string&, const Backoff&);

Expand Down Expand Up @@ -84,8 +81,6 @@ class HandlerBase {

virtual void connectionFailed(Result result) = 0;

virtual HandlerBaseWeakPtr get_weak_from_this() = 0;

virtual const std::string& getName() const = 0;

private:
Expand Down
2 changes: 0 additions & 2 deletions lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,5 @@ void ProducerImpl::asyncWaitSendTimeout(DurationType expiryTime) {
});
}

ProducerImplWeakPtr ProducerImpl::weak_from_this() noexcept { return shared_from_this(); }

} // namespace pulsar
/* namespace pulsar */
12 changes: 6 additions & 6 deletions lib/ProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ namespace proto {
class MessageMetadata;
} // namespace proto

class ProducerImpl : public HandlerBase,
public std::enable_shared_from_this<ProducerImpl>,
public ProducerImplBase {
class ProducerImpl : public HandlerBase, public ProducerImplBase {
public:
ProducerImpl(ClientImplPtr client, const TopicName& topic,
const ProducerConfiguration& producerConfiguration,
Expand Down Expand Up @@ -98,8 +96,11 @@ class ProducerImpl : public HandlerBase,

static int getNumOfChunks(uint32_t size, uint32_t maxMessageSize);

// NOTE: this method is introduced into `enable_shared_from_this` since C++17
ProducerImplWeakPtr weak_from_this() noexcept;
ProducerImplPtr shared_from_this() noexcept {
return std::dynamic_pointer_cast<ProducerImpl>(HandlerBase::shared_from_this());
}

ProducerImplWeakPtr weak_from_this() noexcept { return shared_from_this(); }

protected:
ProducerStatsBasePtr producerStatsBasePtr_;
Expand All @@ -121,7 +122,6 @@ class ProducerImpl : public HandlerBase,
void beforeConnectionChange(ClientConnection& connection) override;
void connectionOpened(const ClientConnectionPtr& connection) override;
void connectionFailed(Result result) override;
HandlerBaseWeakPtr get_weak_from_this() override { return shared_from_this(); }
const std::string& getName() const override { return producerStr_; }

private:
Expand Down

0 comments on commit af45a54

Please sign in to comment.