Skip to content

Commit

Permalink
Delay the timing of setting reconnectionPending to false to avoid dou…
Browse files Browse the repository at this point in the history
…ble attempt at reconnecting (apache#328)

Related Issue: apache#235

### Motivation
A potential double scheduling of reconnection due to a broker shutdown was observed.

The reconnect can be scheduled with either of the following codes
[ https://github.com/apache/pulsar-client-cpp/blob/b35ae1aa4b9834886c0889635de81834f9b2f774/lib/ConsumerImpl.cc#L1209](https://github.com/apache/pulsar-client-cpp/blob/b35ae1aa4b9834886c0889635de81834f9b2f774/lib/ConsumerImpl.cc#L1209)
or
[ https://github.com/apache/pulsar-client-cpp/blob/b35ae1aa4b9834886c0889635de81834f9b2f774/lib/ClientConnection.cc#L1350](https://github.com/apache/pulsar-client-cpp/blob/b35ae1aa4b9834886c0889635de81834f9b2f774/lib/ClientConnection.cc#L1350)
-> [ https://github.com/apache/pulsar-client-cpp/blob/af45a54c10ec5b06e80b683010afd3531457ac64/lib/HandlerBase.cc#L121](https://github.com/apache/pulsar-client-cpp/blob/af45a54c10ec5b06e80b683010afd3531457ac64/lib/HandlerBase.cc#L121)

If a second reconnection request is received during the first reconnection attempt, it triggers additional reconnection attempts. If the second reconnection is successful, the consumer is removed from `cnx`:
[ https://github.com/apache/pulsar-client-cpp/blob/b35ae1aa4b9834886c0889635de81834f9b2f774/lib/ConsumerImpl.cc#L285](https://github.com/apache/pulsar-client-cpp/blob/b35ae1aa4b9834886c0889635de81834f9b2f774/lib/ConsumerImpl.cc#L285)
-> [ https://github.com/apache/pulsar-client-cpp/blob/af45a54c10ec5b06e80b683010afd3531457ac64/lib/HandlerBase.cc#L63](https://github.com/apache/pulsar-client-cpp/blob/af45a54c10ec5b06e80b683010afd3531457ac64/lib/HandlerBase.cc#L63)
--> [ https://github.com/apache/pulsar-client-cpp/blob/b35ae1aa4b9834886c0889635de81834f9b2f774/lib/ConsumerImpl.cc#L217](https://github.com/apache/pulsar-client-cpp/blob/b35ae1aa4b9834886c0889635de81834f9b2f774/lib/ConsumerImpl.cc#L217)

The problem is that the consumer will no longer be able to manage events coming from the broker.
To cope with this issue, a new flag `reconnectionPending_` has been introduced via apache#310 .

However, while the above change reduces the likelihood of the problem occurring, it doesn't eliminate the problem entirely.
In fact, the double reconnects have been observed even after apache#310(I tried with apache@b35ae1a):
```
# Consumer is connected to broker1, but broker1 shutdown closes Consumer and reconnection is scheduled.
...
2023-09-26 15:42:05.736 INFO  [140591970158336] ConsumerImpl:1207 | Broker notification of Closed consumer: 5
2023-09-26 15:42:05.736 INFO  [140591970158336] HandlerBase:147 | [0x7fde18046510, dummy_24, 5] Schedule reconnection in 0.1 s
...

# Consumer attempts to connect to broker1, but fails, and a reconnection is scheduled again.
...
2023-09-26 15:42:05.836 INFO  [140591970158336] HandlerBase:80 | [0x7fde18046510, dummy_24, 5] Getting connection from pool
2023-09-26 15:42:05.837 WARN  [140591970158336] ClientConnection:1741 | [<host(client)>:55304 -> <host(broker1)>:<prot>] Received error response from server: Retryable (Namespace is being unloaded, cannot add topic persistent://shustsud-test2/test/partitioned-topic-partition-5) -- req_id: 16
2023-09-26 15:42:05.837 WARN  [140591970158336] ConsumerImpl:317 | [0x7fde18046510, dummy_24, 5] Failed to reconnect consumer: Retryable
2023-09-26 15:42:05.837 INFO  [140591970158336] HandlerBase:147 | [0x7fde18046510, dummy_24, 5] Schedule reconnection in 0.194 s
...

# During the connection attempt, the connection to broker1 is closed and further reconnection is scheduled.
# After that, two subscribe requests are sent to broker2.
2023-09-26 15:42:06.034 INFO  [140591970158336] HandlerBase:80 | [0x7fde18046510, dummy_24, 5] Getting connection from pool
...
2023-09-26 15:42:06.515 ERROR [140591970158336] ClientConnection:1330 | [<host(client)>:55304 -> <host(broker1)>:<prot>] Connection closed with ConnectError
2023-09-26 15:42:06.515 INFO  [140591970158336] ConnectionPool:122 | Remove connection for pulsar+ssl://<host(broker1)>:<prot>
2023-09-26 15:42:06.515 INFO  [140591970158336] HandlerBase:147 | [0x7fde18046510, dummy_24, 5] Schedule reconnection in 0.392 s
...
2023-09-26 15:42:06.907 INFO  [140591970158336] HandlerBase:80 | [0x7fde18046510, dummy_24, 5] Getting connection from pool
...
2023-09-26 15:42:06.912 INFO  [140591970158336] ConsumerImpl:282 | [0x7fde18046510, dummy_24, 5] Created consumer on broker [<host(client)>:54582 -> <host(broker2)>:<prot>]
...
2023-09-26 15:42:07.103 INFO  [140591970158336] ConsumerImpl:282 | [0x7fde18046510, dummy_24, 5] Created consumer on broker [<host(client)>:54582 -> <host(broker2)>:<prot>]
...
```

To completely eliminate the possibility of the double reconnects, I suggest adjusting the timing of when reconnectionPending_ is set to false. Ideally, this should be done after the handleCreateConsumer method or the handleCreateProducer method has been completed.

### Modifications
The timing for setting `reconnectionPending_` to false has been changed.

(cherry picked from commit ebae92e)
  • Loading branch information
shustsud authored and BewareMyPower committed Oct 20, 2023
1 parent 799869c commit 9038327
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 44 deletions.
42 changes: 30 additions & 12 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,14 @@ void ConsumerImpl::onNegativeAcksSend(const std::set<MessageId>& messageIds) {
interceptors_->onNegativeAcksSend(Consumer(shared_from_this()), messageIds);
}

void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
// Do not use bool, only Result.
Promise<Result, bool> promise;

if (state_ == Closed) {
LOG_WARN(getName() << "connectionOpened : Consumer is already closed");
return;
promise.setFailed(ResultAlreadyClosed);
return promise.getFuture();
}

// Register consumer so that we can handle other incomming commands (e.g. ACTIVE_CONSUMER_CHANGE) after
Expand Down Expand Up @@ -250,9 +254,20 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
subscribeMessageId, readCompacted_, config_.getProperties(), config_.getSubscriptionProperties(),
config_.getSchema(), getInitialPosition(), config_.isReplicateSubscriptionStateEnabled(),
config_.getKeySharedPolicy(), config_.getPriorityLevel());

// Keep a reference to ensure object is kept alive.
auto self = get_shared_this_ptr();
cnx->sendRequestWithId(cmd, requestId)
.addListener(std::bind(&ConsumerImpl::handleCreateConsumer, get_shared_this_ptr(), cnx,
std::placeholders::_1));
.addListener([this, self, cnx, promise](Result result, const ResponseData& responseData) {
Result handleResult = handleCreateConsumer(cnx, result);
if (handleResult == ResultOk) {
promise.setSuccess();
} else {
promise.setFailed(handleResult);
}
});

return promise.getFuture();
}

void ConsumerImpl::connectionFailed(Result result) {
Expand All @@ -272,8 +287,10 @@ void ConsumerImpl::sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int n
}
}

void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result) {
Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result) {
Result handleResult = ResultOk;
LOG_WARN(getName() << "created consumer: " << result << ", " << cnx->cnxString());

static bool firstTime = true;
if (result == ResultOk) {
if (firstTime) {
Expand Down Expand Up @@ -315,20 +332,21 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
if (consumerCreatedPromise_.isComplete()) {
// Consumer had already been initially created, we need to retry connecting in any case
LOG_WARN(getName() << "Failed to reconnect consumer: " << strResult(result));
scheduleReconnection();
handleResult = ResultRetryable;
} else {
// Consumer was not yet created, retry to connect to broker if it's possible
result = convertToTimeoutIfNecessary(result, creationTimestamp_);
if (isResultRetryable(result)) {
LOG_WARN(getName() << "Temporary error in creating consumer: " << strResult(result));
scheduleReconnection();
handleResult = convertToTimeoutIfNecessary(result, creationTimestamp_);
if (isResultRetryable(handleResult)) {
LOG_WARN(getName() << "Temporary error in creating consumer: " << strResult(handleResult));
} else {
LOG_ERROR(getName() << "Failed to create consumer: " << strResult(result));
consumerCreatedPromise_.setFailed(result);
LOG_ERROR(getName() << "Failed to create consumer: " << strResult(handleResult));
consumerCreatedPromise_.setFailed(handleResult);
state_ = Failed;
}
}
}

return handleResult;
}

void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
Expand Down
4 changes: 2 additions & 2 deletions lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,14 @@ class ConsumerImpl : public ConsumerImplBase {

protected:
// overrided methods from HandlerBase
void connectionOpened(const ClientConnectionPtr& cnx) override;
Future<Result, bool> connectionOpened(const ClientConnectionPtr& cnx) override;
void connectionFailed(Result result) override;

// impl methods from ConsumerImpl base
bool hasEnoughMessagesForBatchReceive() const override;
void notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) override;

void handleCreateConsumer(const ClientConnectionPtr& cnx, Result result);
Result handleCreateConsumer(const ClientConnectionPtr& cnx, Result result);

void internalListener();

Expand Down
7 changes: 6 additions & 1 deletion lib/ConsumerImplBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,12 @@ class ConsumerImplBase : public HandlerBase {

protected:
// overrided methods from HandlerBase
void connectionOpened(const ClientConnectionPtr& cnx) override {}
Future<Result, bool> connectionOpened(const ClientConnectionPtr& cnx) override {
// Do not use bool, only Result.
Promise<Result, bool> promise;
promise.setSuccess();
return promise.getFuture();
}
void connectionFailed(Result result) override {}

// consumer impl generic method.
Expand Down
2 changes: 2 additions & 0 deletions lib/Future.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ class Promise {

bool setFailed(Result result) const { return state_->complete(result, {}); }

bool setSuccess() const { return setValue({}); }

bool isComplete() const { return state_->completed(); }

Future<Result, Type> getFuture() const { return Future<Result, Type>{state_}; }
Expand Down
23 changes: 15 additions & 8 deletions lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,33 +67,40 @@ void HandlerBase::setCnx(const ClientConnectionPtr& cnx) {
}

void HandlerBase::grabCnx() {
if (getCnx().lock()) {
LOG_WARN(getName() << "Ignoring reconnection request since we're already connected");
return;
}

bool expectedState = false;
if (!reconnectionPending_.compare_exchange_strong(expectedState, true)) {
LOG_WARN(getName() << "Ignoring reconnection attempt since there's already a pending reconnection");
return;
}

if (getCnx().lock()) {
LOG_WARN(getName() << "Ignoring reconnection request since we're already connected");
reconnectionPending_ = false;
return;
}

LOG_WARN(getName() << "Getting connection from pool");
ClientImplPtr client = client_.lock();
if (!client) {
LOG_WARN(getName() << "Client is invalid when calling grabCnx()");
connectionFailed(ResultConnectError);
reconnectionPending_ = false;
return;
}
auto self = shared_from_this();
client->getConnection(*topic_).addListener([this, self](Result result, const ClientConnectionPtr& cnx) {
reconnectionPending_ = false;

if (result == ResultOk) {
LOG_DEBUG(getName() << "Connected to broker: " << cnx->cnxString());
connectionOpened(cnx);
connectionOpened(cnx).addListener([this, self](Result result, bool) {
// Do not use bool, only Result.
reconnectionPending_ = false;
if (isResultRetryable(result)) {
scheduleReconnection();
}
});
} else {
connectionFailed(result);
reconnectionPending_ = false;
scheduleReconnection();
}
});
Expand Down
10 changes: 7 additions & 3 deletions lib/HandlerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <string>

#include "Backoff.h"
#include "Future.h"

namespace pulsar {

Expand Down Expand Up @@ -74,10 +75,13 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {
virtual void beforeConnectionChange(ClientConnection& cnx) = 0;

/*
* connectionOpened will be implemented by derived class to receive notification
* connectionOpened will be implemented by derived class to receive notification.
*
* @return ResultOk if the connection is successfully completed.
* @return ResultError if there was a failure. ResultRetryable if reconnection is needed.
* @return Do not use bool, only Result.
*/

virtual void connectionOpened(const ClientConnectionPtr& connection) = 0;
virtual Future<Result, bool> connectionOpened(const ClientConnectionPtr& connection) = 0;

virtual void connectionFailed(Result result) = 0;

Expand Down
49 changes: 34 additions & 15 deletions lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,14 @@ void ProducerImpl::beforeConnectionChange(ClientConnection& connection) {
connection.removeProducer(producerId_);
}

void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
Future<Result, bool> ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
// Do not use bool, only Result.
Promise<Result, bool> promise;

if (state_ == Closed) {
LOG_DEBUG(getName() << "connectionOpened : Producer is already closed");
return;
promise.setFailed(ResultAlreadyClosed);
return promise.getFuture();
}

ClientImplPtr client = client_.lock();
Expand All @@ -149,9 +153,20 @@ void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
userProvidedProducerName_, conf_.isEncryptionEnabled(),
static_cast<proto::ProducerAccessMode>(conf_.getAccessMode()),
topicEpoch, conf_.impl_->initialSubscriptionName);

// Keep a reference to ensure object is kept alive.
auto self = shared_from_this();
cnx->sendRequestWithId(cmd, requestId)
.addListener(std::bind(&ProducerImpl::handleCreateProducer, shared_from_this(), cnx,
std::placeholders::_1, std::placeholders::_2));
.addListener([this, self, cnx, promise](Result result, const ResponseData& responseData) {
Result handleResult = handleCreateProducer(cnx, result, responseData);
if (handleResult == ResultOk) {
promise.setSuccess();
} else {
promise.setFailed(handleResult);
}
});

return promise.getFuture();
}

void ProducerImpl::connectionFailed(Result result) {
Expand All @@ -167,8 +182,10 @@ void ProducerImpl::connectionFailed(Result result) {
}
}

void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
const ResponseData& responseData) {
Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
const ResponseData& responseData) {
Result handleResult = ResultOk;

Lock lock(mutex_);

LOG_DEBUG(getName() << "ProducerImpl::handleCreateProducer res: " << strResult(result));
Expand All @@ -190,7 +207,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
lock.unlock();
producerCreatedPromise_.setFailed(ResultAlreadyClosed);
}
return;
return ResultAlreadyClosed;
}

if (result == ResultOk) {
Expand Down Expand Up @@ -259,6 +276,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
}
lock.unlock();
producerCreatedPromise_.setFailed(result);
handleResult = result;
} else if (producerCreatedPromise_.isComplete()) {
if (result == ResultProducerBlockedQuotaExceededException) {
LOG_WARN(getName() << "Backlog is exceeded on topic. Sending exception to producer");
Expand All @@ -269,22 +287,23 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r

// Producer had already been initially created, we need to retry connecting in any case
LOG_WARN(getName() << "Failed to reconnect producer: " << strResult(result));
scheduleReconnection();
handleResult = ResultRetryable;
} else {
// Producer was not yet created, retry to connect to broker if it's possible
result = convertToTimeoutIfNecessary(result, creationTimestamp_);
if (isResultRetryable(result)) {
LOG_WARN(getName() << "Temporary error in creating producer: " << strResult(result));
scheduleReconnection();
handleResult = convertToTimeoutIfNecessary(result, creationTimestamp_);
if (isResultRetryable(handleResult)) {
LOG_WARN(getName() << "Temporary error in creating producer: " << strResult(handleResult));
} else {
LOG_ERROR(getName() << "Failed to create producer: " << strResult(result));
failPendingMessages(result, false);
LOG_ERROR(getName() << "Failed to create producer: " << strResult(handleResult));
failPendingMessages(handleResult, false);
state_ = Failed;
lock.unlock();
producerCreatedPromise_.setFailed(result);
producerCreatedPromise_.setFailed(handleResult);
}
}
}

return handleResult;
}

auto ProducerImpl::getPendingCallbacksWhenFailed() -> decltype(pendingMessagesQueue_) {
Expand Down
6 changes: 3 additions & 3 deletions lib/ProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,15 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase {

// overrided methods from HandlerBase
void beforeConnectionChange(ClientConnection& connection) override;
void connectionOpened(const ClientConnectionPtr& connection) override;
Future<Result, bool> connectionOpened(const ClientConnectionPtr& connection) override;
void connectionFailed(Result result) override;
const std::string& getName() const override { return producerStr_; }

private:
void printStats();

void handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
const ResponseData& responseData);
Result handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
const ResponseData& responseData);

void resendMessages(ClientConnectionPtr cnx);

Expand Down

0 comments on commit 9038327

Please sign in to comment.