diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 7a363bc1..d65fe54c 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -218,10 +218,14 @@ void ConsumerImpl::onNegativeAcksSend(const std::set& messageIds) { interceptors_->onNegativeAcksSend(Consumer(shared_from_this()), messageIds); } -void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) { +Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) { + // Do not use bool, only Result. + Promise promise; + if (state_ == Closed) { LOG_DEBUG(getName() << "connectionOpened : Consumer is already closed"); - return; + promise.setFailed(ResultAlreadyClosed); + return promise.getFuture(); } // Register consumer so that we can handle other incomming commands (e.g. ACTIVE_CONSUMER_CHANGE) after @@ -249,9 +253,20 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) { subscribeMessageId, readCompacted_, config_.getProperties(), config_.getSubscriptionProperties(), config_.getSchema(), getInitialPosition(), config_.isReplicateSubscriptionStateEnabled(), config_.getKeySharedPolicy(), config_.getPriorityLevel()); + + // Keep a reference to ensure object is kept alive. + auto self = get_shared_this_ptr(); cnx->sendRequestWithId(cmd, requestId) - .addListener(std::bind(&ConsumerImpl::handleCreateConsumer, get_shared_this_ptr(), cnx, - std::placeholders::_1)); + .addListener([this, self, cnx, promise](Result result, const ResponseData& responseData) { + Result handleResult = handleCreateConsumer(cnx, result); + if (handleResult == ResultOk) { + promise.setSuccess(); + } else { + promise.setFailed(handleResult); + } + }); + + return promise.getFuture(); } void ConsumerImpl::connectionFailed(Result result) { @@ -271,7 +286,9 @@ void ConsumerImpl::sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int n } } -void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result) { +Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result) { + Result handleResult = ResultOk; + static bool firstTime = true; if (result == ResultOk) { if (firstTime) { @@ -313,20 +330,21 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r if (consumerCreatedPromise_.isComplete()) { // Consumer had already been initially created, we need to retry connecting in any case LOG_WARN(getName() << "Failed to reconnect consumer: " << strResult(result)); - scheduleReconnection(); + handleResult = ResultRetryable; } else { // Consumer was not yet created, retry to connect to broker if it's possible - result = convertToTimeoutIfNecessary(result, creationTimestamp_); - if (isResultRetryable(result)) { - LOG_WARN(getName() << "Temporary error in creating consumer: " << strResult(result)); - scheduleReconnection(); + handleResult = convertToTimeoutIfNecessary(result, creationTimestamp_); + if (isResultRetryable(handleResult)) { + LOG_WARN(getName() << "Temporary error in creating consumer: " << strResult(handleResult)); } else { - LOG_ERROR(getName() << "Failed to create consumer: " << strResult(result)); - consumerCreatedPromise_.setFailed(result); + LOG_ERROR(getName() << "Failed to create consumer: " << strResult(handleResult)); + consumerCreatedPromise_.setFailed(handleResult); state_ = Failed; } } } + + return handleResult; } void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) { diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 7ca39799..8d5bdcfb 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -142,14 +142,14 @@ class ConsumerImpl : public ConsumerImplBase { protected: // overrided methods from HandlerBase - void connectionOpened(const ClientConnectionPtr& cnx) override; + Future connectionOpened(const ClientConnectionPtr& cnx) override; void connectionFailed(Result result) override; // impl methods from ConsumerImpl base bool hasEnoughMessagesForBatchReceive() const override; void notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) override; - void handleCreateConsumer(const ClientConnectionPtr& cnx, Result result); + Result handleCreateConsumer(const ClientConnectionPtr& cnx, Result result); void internalListener(); diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h index e3f9387b..2f3420c6 100644 --- a/lib/ConsumerImplBase.h +++ b/lib/ConsumerImplBase.h @@ -84,7 +84,12 @@ class ConsumerImplBase : public HandlerBase { protected: // overrided methods from HandlerBase - void connectionOpened(const ClientConnectionPtr& cnx) override {} + Future connectionOpened(const ClientConnectionPtr& cnx) override { + // Do not use bool, only Result. + Promise promise; + promise.setSuccess(); + return promise.getFuture(); + } void connectionFailed(Result result) override {} // consumer impl generic method. diff --git a/lib/Future.h b/lib/Future.h index 03e93e46..5ee937ee 100644 --- a/lib/Future.h +++ b/lib/Future.h @@ -138,6 +138,8 @@ class Promise { bool setFailed(Result result) const { return state_->complete(result, {}); } + bool setSuccess() const { return setValue({}); } + bool isComplete() const { return state_->completed(); } Future getFuture() const { return Future{state_}; } diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc index 14f5601b..b9299395 100644 --- a/lib/HandlerBase.cc +++ b/lib/HandlerBase.cc @@ -67,14 +67,15 @@ void HandlerBase::setCnx(const ClientConnectionPtr& cnx) { } void HandlerBase::grabCnx() { - if (getCnx().lock()) { - LOG_INFO(getName() << "Ignoring reconnection request since we're already connected"); + bool expectedState = false; + if (!reconnectionPending_.compare_exchange_strong(expectedState, true)) { + LOG_INFO(getName() << "Ignoring reconnection attempt since there's already a pending reconnection"); return; } - bool expectedState = false; - if (!reconnectionPending_.compare_exchange_strong(expectedState, true)) { - LOG_DEBUG(getName() << "Ignoring reconnection attempt since there's already a pending reconnection"); + if (getCnx().lock()) { + LOG_INFO(getName() << "Ignoring reconnection request since we're already connected"); + reconnectionPending_ = false; return; } @@ -83,17 +84,23 @@ void HandlerBase::grabCnx() { if (!client) { LOG_WARN(getName() << "Client is invalid when calling grabCnx()"); connectionFailed(ResultConnectError); + reconnectionPending_ = false; return; } auto self = shared_from_this(); client->getConnection(*topic_).addListener([this, self](Result result, const ClientConnectionPtr& cnx) { - reconnectionPending_ = false; - if (result == ResultOk) { LOG_DEBUG(getName() << "Connected to broker: " << cnx->cnxString()); - connectionOpened(cnx); + connectionOpened(cnx).addListener([this, self](Result result, bool) { + // Do not use bool, only Result. + reconnectionPending_ = false; + if (isResultRetryable(result)) { + scheduleReconnection(); + } + }); } else { connectionFailed(result); + reconnectionPending_ = false; scheduleReconnection(); } }); diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h index 71eaafec..ad16a220 100644 --- a/lib/HandlerBase.h +++ b/lib/HandlerBase.h @@ -26,6 +26,7 @@ #include #include "Backoff.h" +#include "Future.h" namespace pulsar { @@ -74,10 +75,13 @@ class HandlerBase : public std::enable_shared_from_this { virtual void beforeConnectionChange(ClientConnection& cnx) = 0; /* - * connectionOpened will be implemented by derived class to receive notification + * connectionOpened will be implemented by derived class to receive notification. + * + * @return ResultOk if the connection is successfully completed. + * @return ResultError if there was a failure. ResultRetryable if reconnection is needed. + * @return Do not use bool, only Result. */ - - virtual void connectionOpened(const ClientConnectionPtr& connection) = 0; + virtual Future connectionOpened(const ClientConnectionPtr& connection) = 0; virtual void connectionFailed(Result result) = 0; diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 8bd14f23..41595dd9 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -135,10 +135,14 @@ void ProducerImpl::beforeConnectionChange(ClientConnection& connection) { connection.removeProducer(producerId_); } -void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) { +Future ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) { + // Do not use bool, only Result. + Promise promise; + if (state_ == Closed) { LOG_DEBUG(getName() << "connectionOpened : Producer is already closed"); - return; + promise.setFailed(ResultAlreadyClosed); + return promise.getFuture(); } ClientImplPtr client = client_.lock(); @@ -149,9 +153,20 @@ void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) { userProvidedProducerName_, conf_.isEncryptionEnabled(), static_cast(conf_.getAccessMode()), topicEpoch, conf_.impl_->initialSubscriptionName); + + // Keep a reference to ensure object is kept alive. + auto self = shared_from_this(); cnx->sendRequestWithId(cmd, requestId) - .addListener(std::bind(&ProducerImpl::handleCreateProducer, shared_from_this(), cnx, - std::placeholders::_1, std::placeholders::_2)); + .addListener([this, self, cnx, promise](Result result, const ResponseData& responseData) { + Result handleResult = handleCreateProducer(cnx, result, responseData); + if (handleResult == ResultOk) { + promise.setSuccess(); + } else { + promise.setFailed(handleResult); + } + }); + + return promise.getFuture(); } void ProducerImpl::connectionFailed(Result result) { @@ -167,8 +182,10 @@ void ProducerImpl::connectionFailed(Result result) { } } -void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result result, - const ResponseData& responseData) { +Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result result, + const ResponseData& responseData) { + Result handleResult = ResultOk; + Lock lock(mutex_); LOG_DEBUG(getName() << "ProducerImpl::handleCreateProducer res: " << strResult(result)); @@ -190,7 +207,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r lock.unlock(); producerCreatedPromise_.setFailed(ResultAlreadyClosed); } - return; + return ResultAlreadyClosed; } if (result == ResultOk) { @@ -259,6 +276,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r } lock.unlock(); producerCreatedPromise_.setFailed(result); + handleResult = result; } else if (producerCreatedPromise_.isComplete()) { if (result == ResultProducerBlockedQuotaExceededException) { LOG_WARN(getName() << "Backlog is exceeded on topic. Sending exception to producer"); @@ -269,22 +287,23 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r // Producer had already been initially created, we need to retry connecting in any case LOG_WARN(getName() << "Failed to reconnect producer: " << strResult(result)); - scheduleReconnection(); + handleResult = ResultRetryable; } else { // Producer was not yet created, retry to connect to broker if it's possible - result = convertToTimeoutIfNecessary(result, creationTimestamp_); - if (isResultRetryable(result)) { - LOG_WARN(getName() << "Temporary error in creating producer: " << strResult(result)); - scheduleReconnection(); + handleResult = convertToTimeoutIfNecessary(result, creationTimestamp_); + if (isResultRetryable(handleResult)) { + LOG_WARN(getName() << "Temporary error in creating producer: " << strResult(handleResult)); } else { - LOG_ERROR(getName() << "Failed to create producer: " << strResult(result)); - failPendingMessages(result, false); + LOG_ERROR(getName() << "Failed to create producer: " << strResult(handleResult)); + failPendingMessages(handleResult, false); state_ = Failed; lock.unlock(); - producerCreatedPromise_.setFailed(result); + producerCreatedPromise_.setFailed(handleResult); } } } + + return handleResult; } auto ProducerImpl::getPendingCallbacksWhenFailed() -> decltype(pendingMessagesQueue_) { diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h index aa4ba35a..770ac45f 100644 --- a/lib/ProducerImpl.h +++ b/lib/ProducerImpl.h @@ -120,15 +120,15 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase { // overrided methods from HandlerBase void beforeConnectionChange(ClientConnection& connection) override; - void connectionOpened(const ClientConnectionPtr& connection) override; + Future connectionOpened(const ClientConnectionPtr& connection) override; void connectionFailed(Result result) override; const std::string& getName() const override { return producerStr_; } private: void printStats(); - void handleCreateProducer(const ClientConnectionPtr& cnx, Result result, - const ResponseData& responseData); + Result handleCreateProducer(const ClientConnectionPtr& cnx, Result result, + const ResponseData& responseData); void resendMessages(ClientConnectionPtr cnx);