Skip to content

Commit

Permalink
Fix creating producer or consumer is not retried for connection failure
Browse files Browse the repository at this point in the history
Fixes #391

### Motivation

When `connectionFailed` is called, no matter if the result is retryable
the creation of producer or consumer will fail without retry.

### Modifications

Check if the result is retryable in `connectionFailed` for
`ProducerImpl` and `ConsumerImpl` and only fail for non-retryable errors
or the timeout error. Register another timer in `HandlerBase` to
propagate the timeout error to `connectionFailed`.

Add `testRetryUntilSucceed`, `testRetryTimeout`, `testNoRetry` to verify
client could retry according to the result returned by
`ClientImpl::getConnection`.
  • Loading branch information
BewareMyPower committed Feb 6, 2024
1 parent 1f94dd9 commit 8528a23
Show file tree
Hide file tree
Showing 13 changed files with 237 additions and 20 deletions.
6 changes: 3 additions & 3 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ static Result getResult(ServerError serverError, const std::string& message) {
return ResultConsumerBusy;

case ServiceNotReady:
// If the error is not caused by a PulsarServerException, treat it as retryable.
return (message.find("PulsarServerException") == std::string::npos) ? ResultRetryable
: ResultServiceUnitNotReady;
return (message.find("the broker do not have test listener") != std::string::npos)
? ResultConnectError
: ResultServiceUnitNotReady;

case ProducerBlockedQuotaExceededError:
return ResultProducerBlockedQuotaExceededError;
Expand Down
4 changes: 3 additions & 1 deletion lib/ClientConnectionAdaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ inline void checkServerError(Connection& connection, ServerError error, const st
// "Namespace bundle ... is being unloaded"
// "KeeperException$..."
// "Failed to acquire ownership for namespace bundle ..."
// "the broker do not have test listener"
// Before https://github.com/apache/pulsar/pull/21211, the error of the 1st and 2nd messages
// is ServiceNotReady. Before https://github.com/apache/pulsar/pull/21993, the error of the 3rd
// message is ServiceNotReady.
if (message.find("Failed to acquire ownership") == std::string::npos &&
message.find("KeeperException") == std::string::npos &&
message.find("is being unloaded") == std::string::npos) {
message.find("is being unloaded") == std::string::npos &&
message.find("the broker do not have test listener") == std::string::npos) {
connection.close(ResultDisconnected);
}
break;
Expand Down
4 changes: 2 additions & 2 deletions lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr co
}
}

Future<Result, ClientConnectionPtr> ClientImpl::getConnection(const std::string& topic, size_t key) {
GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t key) {
Promise<Result, ClientConnectionPtr> promise;

const auto topicNamePtr = TopicName::get(topic);
Expand Down Expand Up @@ -562,7 +562,7 @@ const std::string& ClientImpl::getPhysicalAddress(const std::string& logicalAddr
}
}

Future<Result, ClientConnectionPtr> ClientImpl::connect(const std::string& logicalAddress, size_t key) {
GetConnectionFuture ClientImpl::connect(const std::string& logicalAddress, size_t key) {
const auto& physicalAddress = getPhysicalAddress(logicalAddress);
Promise<Result, ClientConnectionPtr> promise;
pool_.getConnectionAsync(logicalAddress, physicalAddress, key)
Expand Down
8 changes: 5 additions & 3 deletions lib/ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,14 @@ class TopicName;
using TopicNamePtr = std::shared_ptr<TopicName>;

using NamespaceTopicsPtr = std::shared_ptr<std::vector<std::string>>;
using GetConnectionFuture = Future<Result, ClientConnectionPtr>;

std::string generateRandomName();

class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
public:
ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration);
~ClientImpl();
virtual ~ClientImpl();

/**
* @param autoDownloadSchema When it is true, Before creating a producer, it will try to get the schema
Expand All @@ -95,9 +96,10 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {

void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback);

Future<Result, ClientConnectionPtr> getConnection(const std::string& topic, size_t key);
// Use virtual method to test
virtual GetConnectionFuture getConnection(const std::string& topic, size_t key);

Future<Result, ClientConnectionPtr> connect(const std::string& logicalAddress, size_t key);
GetConnectionFuture connect(const std::string& logicalAddress, size_t key);

void closeAsync(CloseCallback callback);
void shutdown();
Expand Down
2 changes: 1 addition & 1 deletion lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ void ConsumerImpl::connectionFailed(Result result) {
// Keep a reference to ensure object is kept alive
auto ptr = get_shared_this_ptr();

if (consumerCreatedPromise_.setFailed(result)) {
if (!isResultRetryable(result) && consumerCreatedPromise_.setFailed(result)) {
state_ = Failed;
}
}
Expand Down
9 changes: 9 additions & 0 deletions lib/Future.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ class Future {

Result get(Type &result) { return state_->get(result); }

static Future<Result, Type> failed(Result result);

private:
InternalStatePtr<Result, Type> state_;

Expand Down Expand Up @@ -144,6 +146,13 @@ class Promise {
InternalStatePtr<Result, Type> state_;
};

template <typename Result, typename Type>
inline Future<Result, Type> Future<Result, Type>::failed(Result result) {
Promise<Result, Type> promise;
promise.setFailed(result);
return promise.getFuture();
}

} // namespace pulsar

#endif
21 changes: 18 additions & 3 deletions lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,31 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
backoff_(backoff),
epoch_(0),
timer_(executor_->createDeadlineTimer()),
creationTimer_(executor_->createDeadlineTimer()),
reconnectionPending_(false) {}

HandlerBase::~HandlerBase() { timer_->cancel(); }
HandlerBase::~HandlerBase() {
ASIO_ERROR ignored;
timer_->cancel(ignored);
creationTimer_->cancel(ignored);
}

void HandlerBase::start() {
// guard against concurrent state changes such as closing
State state = NotStarted;
if (state_.compare_exchange_strong(state, Pending)) {
grabCnx();
}
creationTimer_->expires_from_now(operationTimeut_);
std::weak_ptr<HandlerBase> weakSelf{shared_from_this()};
creationTimer_->async_wait([this, weakSelf](const ASIO_ERROR& error) {
auto self = weakSelf.lock();
if (self && !error) {
connectionFailed(ResultTimeout);
ASIO_ERROR ignored;
timer_->cancel(ignored);
}
});
}

ClientConnectionWeakPtr HandlerBase::getCnx() const {
Expand Down Expand Up @@ -96,7 +111,7 @@ void HandlerBase::grabCnx(const boost::optional<std::string>& assignedBrokerUrl)
ClientImplPtr client = client_.lock();
if (!client) {
LOG_WARN(getName() << "Client is invalid when calling grabCnx()");
connectionFailed(ResultConnectError);
connectionFailed(ResultAlreadyClosed);
reconnectionPending_ = false;
return;
}
Expand All @@ -108,7 +123,7 @@ void HandlerBase::grabCnx(const boost::optional<std::string>& assignedBrokerUrl)
connectionOpened(cnx).addListener([this, self](Result result, bool) {
// Do not use bool, only Result.
reconnectionPending_ = false;
if (isResultRetryable(result)) {
if (result != ResultOk && isResultRetryable(result)) {
scheduleReconnection();
}
});
Expand Down
1 change: 1 addition & 0 deletions lib/HandlerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {

private:
DeadlineTimerPtr timer_;
DeadlineTimerPtr creationTimer_;

mutable std::mutex connectionMutex_;
std::atomic<bool> reconnectionPending_;
Expand Down
2 changes: 1 addition & 1 deletion lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ void ProducerImpl::connectionFailed(Result result) {
// if producers are lazy, then they should always try to restart
// so don't change the state and allow reconnections
return;
} else if (producerCreatedPromise_.setFailed(result)) {
} else if (!isResultRetryable(result) && producerCreatedPromise_.setFailed(result)) {
state_ = Failed;
}
}
Expand Down
29 changes: 28 additions & 1 deletion lib/ResultUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,39 @@
*/
#pragma once

#include <assert.h>
#include <pulsar/Result.h>

#include <unordered_set>

namespace pulsar {

inline bool isResultRetryable(Result result) {
return result == ResultRetryable || result == ResultDisconnected;
assert(result != ResultOk);
if (result == ResultRetryable || result == ResultDisconnected) {
return true;
}

static const std::unordered_set<Result> fatalResults{ResultConnectError,
ResultTimeout,
ResultAuthenticationError,
ResultAuthorizationError,
ResultInvalidUrl,
ResultInvalidConfiguration,
ResultIncompatibleSchema,
ResultTopicNotFound,
ResultOperationNotSupported,
ResultNotAllowedError,
ResultChecksumError,
ResultCryptoError,
ResultConsumerAssignError,
ResultProducerBusy,
ResultConsumerBusy,
ResultLookupError,
ResultTooManyLookupRequestException,
ResultProducerBlockedQuotaExceededException,
ResultProducerBlockedQuotaExceededError};
return fatalResults.find(result) == fatalResults.cend();
}

} // namespace pulsar
75 changes: 71 additions & 4 deletions tests/ClientTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <future>
#include <sstream>

#include "MockClientImpl.h"
#include "PulsarAdminHelper.h"
#include "PulsarFriend.h"
#include "WaitUtils.h"
Expand All @@ -36,6 +37,7 @@
DECLARE_LOG_OBJECT()

using namespace pulsar;
using testing::AtLeast;

static std::string lookupUrl = "pulsar://localhost:6650";
static std::string adminUrl = "http://localhost:8080/";
Expand Down Expand Up @@ -248,7 +250,7 @@ TEST(ClientTest, testWrongListener) {

Client client(lookupUrl, ClientConfiguration().setListenerName("test"));
Producer producer;
ASSERT_EQ(ResultServiceUnitNotReady, client.createProducer(topic, producer));
ASSERT_EQ(ResultConnectError, client.createProducer(topic, producer));
ASSERT_EQ(ResultProducerNotInitialized, producer.close());
ASSERT_EQ(PulsarFriend::getProducers(client).size(), 0);
ASSERT_EQ(ResultOk, client.close());
Expand All @@ -257,7 +259,7 @@ TEST(ClientTest, testWrongListener) {
// creation of Consumer or Reader could fail with ResultConnectError.
client = Client(lookupUrl, ClientConfiguration().setListenerName("test"));
Consumer consumer;
ASSERT_EQ(ResultServiceUnitNotReady, client.subscribe(topic, "sub", consumer));
ASSERT_EQ(ResultConnectError, client.subscribe(topic, "sub", consumer));
ASSERT_EQ(ResultConsumerNotInitialized, consumer.close());

ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0);
Expand All @@ -266,7 +268,7 @@ TEST(ClientTest, testWrongListener) {
client = Client(lookupUrl, ClientConfiguration().setListenerName("test"));

Consumer multiTopicsConsumer;
ASSERT_EQ(ResultServiceUnitNotReady,
ASSERT_EQ(ResultConnectError,
client.subscribe({topic + "-partition-0", topic + "-partition-1", topic + "-partition-2"},
"sub", multiTopicsConsumer));

Expand All @@ -278,7 +280,7 @@ TEST(ClientTest, testWrongListener) {

// Currently Reader can only read a non-partitioned topic in C++ client
Reader reader;
ASSERT_EQ(ResultServiceUnitNotReady,
ASSERT_EQ(ResultConnectError,
client.createReader(topic + "-partition-0", MessageId::earliest(), {}, reader));
ASSERT_EQ(ResultConsumerNotInitialized, reader.close());
ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0);
Expand Down Expand Up @@ -434,3 +436,68 @@ TEST(ClientTest, testConnectionClose) {
client.close();
}
}

TEST(ClientTest, testRetryUntilSucceed) {
auto clientImpl = std::make_shared<MockClientImpl>(lookupUrl);
constexpr int kFailCount = 3;
EXPECT_CALL(*clientImpl, getConnection).Times((kFailCount + 1) * 2);
std::atomic_int count{0};
ON_CALL(*clientImpl, getConnection)
.WillByDefault([&clientImpl, &count](const std::string &topic, size_t index) {
if (count++ < kFailCount) {
return GetConnectionFuture::failed(ResultRetryable);
}
return clientImpl->getConnectionReal(topic, index);
});

auto topic = "client-test-retry-until-succeed";
ASSERT_EQ(ResultOk, clientImpl->createProducer(topic).result);
count = 0;
ASSERT_EQ(ResultOk, clientImpl->subscribe(topic).result);
ASSERT_EQ(ResultOk, clientImpl->close());
}

TEST(ClientTest, testRetryTimeout) {
auto clientImpl =
std::make_shared<MockClientImpl>(lookupUrl, ClientConfiguration().setOperationTimeoutSeconds(2));
EXPECT_CALL(*clientImpl, getConnection).Times(AtLeast(2 * 2));
ON_CALL(*clientImpl, getConnection).WillByDefault([](const std::string &topic, size_t index) {
return GetConnectionFuture::failed(ResultRetryable);
});

auto topic = "client-test-retry-timeout";
{
MockClientImpl::SyncOpResult result = clientImpl->createProducer(topic);
ASSERT_EQ(ResultTimeout, result.result);
ASSERT_TRUE(result.timeMs >= 2000 && result.timeMs < 2100) << "producer: " << result.timeMs << " ms";
}
{
MockClientImpl::SyncOpResult result = clientImpl->subscribe(topic);
ASSERT_EQ(ResultTimeout, result.result);
ASSERT_TRUE(result.timeMs >= 2000 && result.timeMs < 2100) << "consumer: " << result.timeMs << " ms";
}

ASSERT_EQ(ResultOk, clientImpl->close());
}

TEST(ClientTest, testNoRetry) {
auto clientImpl =
std::make_shared<MockClientImpl>(lookupUrl, ClientConfiguration().setOperationTimeoutSeconds(100));
EXPECT_CALL(*clientImpl, getConnection).Times(2);
ON_CALL(*clientImpl, getConnection).WillByDefault([](const std::string &, size_t) {
return GetConnectionFuture::failed(ResultAuthenticationError);
});

auto topic = "client-test-no-retry";
{
MockClientImpl::SyncOpResult result = clientImpl->createProducer(topic);
ASSERT_EQ(ResultAuthenticationError, result.result);
ASSERT_TRUE(result.timeMs < 1000) << "producer: " << result.timeMs << " ms";
}
{
MockClientImpl::SyncOpResult result = clientImpl->subscribe(topic);
LOG_INFO("It takes " << result.timeMs << " ms to subscribe");
ASSERT_EQ(ResultAuthenticationError, result.result);
ASSERT_TRUE(result.timeMs < 1000) << "consumer: " << result.timeMs << " ms";
}
}
Loading

0 comments on commit 8528a23

Please sign in to comment.