From 68b4244f321345928e0943387c1dca7b5f448d2c Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 7 Feb 2024 10:26:36 +0800 Subject: [PATCH] Fix creating producer or consumer is not retried for connection failure (#396) Fixes https://github.com/apache/pulsar-client-cpp/issues/391 ### Motivation When `connectionFailed` is called, no matter if the result is retryable the creation of producer or consumer will fail without retry. ### Modifications Check if the result is retryable in `connectionFailed` for `ProducerImpl` and `ConsumerImpl` and only fail for non-retryable errors or the timeout error. Register another timer in `HandlerBase` to propagate the timeout error to `connectionFailed`. Add `testRetryUntilSucceed`, `testRetryTimeout`, `testNoRetry` to verify client could retry according to the result returned by `ClientImpl::getConnection`. --- lib/ClientConnection.cc | 11 +++-- lib/ClientConnectionAdaptor.h | 4 +- lib/ClientImpl.cc | 4 +- lib/ClientImpl.h | 8 +-- lib/ConsumerImpl.cc | 2 +- lib/Future.h | 9 ++++ lib/HandlerBase.cc | 21 ++++++-- lib/HandlerBase.h | 1 + lib/ProducerImpl.cc | 2 +- lib/ResultUtils.h | 29 ++++++++++- tests/ClientTest.cc | 75 ++++++++++++++++++++++++++-- tests/MockClientImpl.h | 92 +++++++++++++++++++++++++++++++++++ 12 files changed, 239 insertions(+), 19 deletions(-) create mode 100644 tests/MockClientImpl.h diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 23e60225..00041b2a 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -83,7 +83,7 @@ static Result getResult(ServerError serverError, const std::string& message) { case ServiceNotReady: return (message.find("the broker do not have test listener") == std::string::npos) ? ResultRetryable - : ResultServiceUnitNotReady; + : ResultConnectError; case ProducerBlockedQuotaExceededError: return ResultProducerBlockedQuotaExceededError; @@ -508,8 +508,13 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver:: void ClientConnection::handleHandshake(const ASIO_ERROR& err) { if (err) { - LOG_ERROR(cnxString_ << "Handshake failed: " << err.message()); - close(); + if (err.value() == ASIO::ssl::error::stream_truncated) { + LOG_WARN(cnxString_ << "Handshake failed: " << err.message()); + close(ResultRetryable); + } else { + LOG_ERROR(cnxString_ << "Handshake failed: " << err.message()); + close(); + } return; } diff --git a/lib/ClientConnectionAdaptor.h b/lib/ClientConnectionAdaptor.h index 8f5599fb..2c299373 100644 --- a/lib/ClientConnectionAdaptor.h +++ b/lib/ClientConnectionAdaptor.h @@ -37,12 +37,14 @@ inline void checkServerError(Connection& connection, ServerError error, const st // "Namespace bundle ... is being unloaded" // "KeeperException$..." // "Failed to acquire ownership for namespace bundle ..." + // "the broker do not have test listener" // Before https://github.com/apache/pulsar/pull/21211, the error of the 1st and 2nd messages // is ServiceNotReady. Before https://github.com/apache/pulsar/pull/21993, the error of the 3rd // message is ServiceNotReady. if (message.find("Failed to acquire ownership") == std::string::npos && message.find("KeeperException") == std::string::npos && - message.find("is being unloaded") == std::string::npos) { + message.find("is being unloaded") == std::string::npos && + message.find("the broker do not have test listener") == std::string::npos) { connection.close(ResultDisconnected); } break; diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index 63c85b0f..ae339731 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -517,7 +517,7 @@ void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr co } } -Future ClientImpl::getConnection(const std::string& topic, size_t key) { +GetConnectionFuture ClientImpl::getConnection(const std::string& topic, size_t key) { Promise promise; const auto topicNamePtr = TopicName::get(topic); @@ -562,7 +562,7 @@ const std::string& ClientImpl::getPhysicalAddress(const std::string& logicalAddr } } -Future ClientImpl::connect(const std::string& logicalAddress, size_t key) { +GetConnectionFuture ClientImpl::connect(const std::string& logicalAddress, size_t key) { const auto& physicalAddress = getPhysicalAddress(logicalAddress); Promise promise; pool_.getConnectionAsync(logicalAddress, physicalAddress, key) diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index a2649a6a..7126542b 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -63,13 +63,14 @@ class TopicName; using TopicNamePtr = std::shared_ptr; using NamespaceTopicsPtr = std::shared_ptr>; +using GetConnectionFuture = Future; std::string generateRandomName(); class ClientImpl : public std::enable_shared_from_this { public: ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration); - ~ClientImpl(); + virtual ~ClientImpl(); /** * @param autoDownloadSchema When it is true, Before creating a producer, it will try to get the schema @@ -95,9 +96,10 @@ class ClientImpl : public std::enable_shared_from_this { void getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback); - Future getConnection(const std::string& topic, size_t key); + // Use virtual method to test + virtual GetConnectionFuture getConnection(const std::string& topic, size_t key); - Future connect(const std::string& logicalAddress, size_t key); + GetConnectionFuture connect(const std::string& logicalAddress, size_t key); void closeAsync(CloseCallback callback); void shutdown(); diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 86dddb0d..0051ed6d 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -277,7 +277,7 @@ void ConsumerImpl::connectionFailed(Result result) { // Keep a reference to ensure object is kept alive auto ptr = get_shared_this_ptr(); - if (consumerCreatedPromise_.setFailed(result)) { + if (!isResultRetryable(result) && consumerCreatedPromise_.setFailed(result)) { state_ = Failed; } } diff --git a/lib/Future.h b/lib/Future.h index 69db74a3..22d43cbe 100644 --- a/lib/Future.h +++ b/lib/Future.h @@ -116,6 +116,8 @@ class Future { Result get(Type &result) { return state_->get(result); } + static Future failed(Result result); + private: InternalStatePtr state_; @@ -144,6 +146,13 @@ class Promise { InternalStatePtr state_; }; +template +inline Future Future::failed(Result result) { + Promise promise; + promise.setFailed(result); + return promise.getFuture(); +} + } // namespace pulsar #endif diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc index c5327fcf..52e20d2d 100644 --- a/lib/HandlerBase.cc +++ b/lib/HandlerBase.cc @@ -42,9 +42,14 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, backoff_(backoff), epoch_(0), timer_(executor_->createDeadlineTimer()), + creationTimer_(executor_->createDeadlineTimer()), reconnectionPending_(false) {} -HandlerBase::~HandlerBase() { timer_->cancel(); } +HandlerBase::~HandlerBase() { + ASIO_ERROR ignored; + timer_->cancel(ignored); + creationTimer_->cancel(ignored); +} void HandlerBase::start() { // guard against concurrent state changes such as closing @@ -52,6 +57,16 @@ void HandlerBase::start() { if (state_.compare_exchange_strong(state, Pending)) { grabCnx(); } + creationTimer_->expires_from_now(operationTimeut_); + std::weak_ptr weakSelf{shared_from_this()}; + creationTimer_->async_wait([this, weakSelf](const ASIO_ERROR& error) { + auto self = weakSelf.lock(); + if (self && !error) { + connectionFailed(ResultTimeout); + ASIO_ERROR ignored; + timer_->cancel(ignored); + } + }); } ClientConnectionWeakPtr HandlerBase::getCnx() const { @@ -96,7 +111,7 @@ void HandlerBase::grabCnx(const boost::optional& assignedBrokerUrl) ClientImplPtr client = client_.lock(); if (!client) { LOG_WARN(getName() << "Client is invalid when calling grabCnx()"); - connectionFailed(ResultConnectError); + connectionFailed(ResultAlreadyClosed); reconnectionPending_ = false; return; } @@ -108,7 +123,7 @@ void HandlerBase::grabCnx(const boost::optional& assignedBrokerUrl) connectionOpened(cnx).addListener([this, self](Result result, bool) { // Do not use bool, only Result. reconnectionPending_ = false; - if (isResultRetryable(result)) { + if (result != ResultOk && isResultRetryable(result)) { scheduleReconnection(); } }); diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h index b9d98553..32e124a2 100644 --- a/lib/HandlerBase.h +++ b/lib/HandlerBase.h @@ -140,6 +140,7 @@ class HandlerBase : public std::enable_shared_from_this { private: DeadlineTimerPtr timer_; + DeadlineTimerPtr creationTimer_; mutable std::mutex connectionMutex_; std::atomic reconnectionPending_; diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 14a74c63..890d476c 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -176,7 +176,7 @@ void ProducerImpl::connectionFailed(Result result) { // if producers are lazy, then they should always try to restart // so don't change the state and allow reconnections return; - } else if (producerCreatedPromise_.setFailed(result)) { + } else if (!isResultRetryable(result) && producerCreatedPromise_.setFailed(result)) { state_ = Failed; } } diff --git a/lib/ResultUtils.h b/lib/ResultUtils.h index b5ec6cdb..dfba7eb0 100644 --- a/lib/ResultUtils.h +++ b/lib/ResultUtils.h @@ -18,12 +18,39 @@ */ #pragma once +#include #include +#include + namespace pulsar { inline bool isResultRetryable(Result result) { - return result == ResultRetryable || result == ResultDisconnected; + assert(result != ResultOk); + if (result == ResultRetryable || result == ResultDisconnected) { + return true; + } + + static const std::unordered_set fatalResults{ResultConnectError, + ResultTimeout, + ResultAuthenticationError, + ResultAuthorizationError, + ResultInvalidUrl, + ResultInvalidConfiguration, + ResultIncompatibleSchema, + ResultTopicNotFound, + ResultOperationNotSupported, + ResultNotAllowedError, + ResultChecksumError, + ResultCryptoError, + ResultConsumerAssignError, + ResultProducerBusy, + ResultConsumerBusy, + ResultLookupError, + ResultTooManyLookupRequestException, + ResultProducerBlockedQuotaExceededException, + ResultProducerBlockedQuotaExceededError}; + return fatalResults.find(static_cast(result)) == fatalResults.cend(); } } // namespace pulsar diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc index 8c5a32ab..1e8e67d5 100644 --- a/tests/ClientTest.cc +++ b/tests/ClientTest.cc @@ -25,6 +25,7 @@ #include #include +#include "MockClientImpl.h" #include "PulsarAdminHelper.h" #include "PulsarFriend.h" #include "WaitUtils.h" @@ -36,6 +37,7 @@ DECLARE_LOG_OBJECT() using namespace pulsar; +using testing::AtLeast; static std::string lookupUrl = "pulsar://localhost:6650"; static std::string adminUrl = "http://localhost:8080/"; @@ -248,7 +250,7 @@ TEST(ClientTest, testWrongListener) { Client client(lookupUrl, ClientConfiguration().setListenerName("test")); Producer producer; - ASSERT_EQ(ResultServiceUnitNotReady, client.createProducer(topic, producer)); + ASSERT_EQ(ResultConnectError, client.createProducer(topic, producer)); ASSERT_EQ(ResultProducerNotInitialized, producer.close()); ASSERT_EQ(PulsarFriend::getProducers(client).size(), 0); ASSERT_EQ(ResultOk, client.close()); @@ -257,7 +259,7 @@ TEST(ClientTest, testWrongListener) { // creation of Consumer or Reader could fail with ResultConnectError. client = Client(lookupUrl, ClientConfiguration().setListenerName("test")); Consumer consumer; - ASSERT_EQ(ResultServiceUnitNotReady, client.subscribe(topic, "sub", consumer)); + ASSERT_EQ(ResultConnectError, client.subscribe(topic, "sub", consumer)); ASSERT_EQ(ResultConsumerNotInitialized, consumer.close()); ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0); @@ -266,7 +268,7 @@ TEST(ClientTest, testWrongListener) { client = Client(lookupUrl, ClientConfiguration().setListenerName("test")); Consumer multiTopicsConsumer; - ASSERT_EQ(ResultServiceUnitNotReady, + ASSERT_EQ(ResultConnectError, client.subscribe({topic + "-partition-0", topic + "-partition-1", topic + "-partition-2"}, "sub", multiTopicsConsumer)); @@ -278,7 +280,7 @@ TEST(ClientTest, testWrongListener) { // Currently Reader can only read a non-partitioned topic in C++ client Reader reader; - ASSERT_EQ(ResultServiceUnitNotReady, + ASSERT_EQ(ResultConnectError, client.createReader(topic + "-partition-0", MessageId::earliest(), {}, reader)); ASSERT_EQ(ResultConsumerNotInitialized, reader.close()); ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0); @@ -434,3 +436,68 @@ TEST(ClientTest, testConnectionClose) { client.close(); } } + +TEST(ClientTest, testRetryUntilSucceed) { + auto clientImpl = std::make_shared(lookupUrl); + constexpr int kFailCount = 3; + EXPECT_CALL(*clientImpl, getConnection).Times((kFailCount + 1) * 2); + std::atomic_int count{0}; + ON_CALL(*clientImpl, getConnection) + .WillByDefault([&clientImpl, &count](const std::string &topic, size_t index) { + if (count++ < kFailCount) { + return GetConnectionFuture::failed(ResultRetryable); + } + return clientImpl->getConnectionReal(topic, index); + }); + + auto topic = "client-test-retry-until-succeed"; + ASSERT_EQ(ResultOk, clientImpl->createProducer(topic).result); + count = 0; + ASSERT_EQ(ResultOk, clientImpl->subscribe(topic).result); + ASSERT_EQ(ResultOk, clientImpl->close()); +} + +TEST(ClientTest, testRetryTimeout) { + auto clientImpl = + std::make_shared(lookupUrl, ClientConfiguration().setOperationTimeoutSeconds(2)); + EXPECT_CALL(*clientImpl, getConnection).Times(AtLeast(2 * 2)); + ON_CALL(*clientImpl, getConnection).WillByDefault([](const std::string &topic, size_t index) { + return GetConnectionFuture::failed(ResultRetryable); + }); + + auto topic = "client-test-retry-timeout"; + { + MockClientImpl::SyncOpResult result = clientImpl->createProducer(topic); + ASSERT_EQ(ResultTimeout, result.result); + ASSERT_TRUE(result.timeMs >= 2000 && result.timeMs < 2100) << "producer: " << result.timeMs << " ms"; + } + { + MockClientImpl::SyncOpResult result = clientImpl->subscribe(topic); + ASSERT_EQ(ResultTimeout, result.result); + ASSERT_TRUE(result.timeMs >= 2000 && result.timeMs < 2100) << "consumer: " << result.timeMs << " ms"; + } + + ASSERT_EQ(ResultOk, clientImpl->close()); +} + +TEST(ClientTest, testNoRetry) { + auto clientImpl = + std::make_shared(lookupUrl, ClientConfiguration().setOperationTimeoutSeconds(100)); + EXPECT_CALL(*clientImpl, getConnection).Times(2); + ON_CALL(*clientImpl, getConnection).WillByDefault([](const std::string &, size_t) { + return GetConnectionFuture::failed(ResultAuthenticationError); + }); + + auto topic = "client-test-no-retry"; + { + MockClientImpl::SyncOpResult result = clientImpl->createProducer(topic); + ASSERT_EQ(ResultAuthenticationError, result.result); + ASSERT_TRUE(result.timeMs < 1000) << "producer: " << result.timeMs << " ms"; + } + { + MockClientImpl::SyncOpResult result = clientImpl->subscribe(topic); + LOG_INFO("It takes " << result.timeMs << " ms to subscribe"); + ASSERT_EQ(ResultAuthenticationError, result.result); + ASSERT_TRUE(result.timeMs < 1000) << "consumer: " << result.timeMs << " ms"; + } +} diff --git a/tests/MockClientImpl.h b/tests/MockClientImpl.h new file mode 100644 index 00000000..aa4208a9 --- /dev/null +++ b/tests/MockClientImpl.h @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include +#include + +#include +#include +#include + +#include "lib/ClientImpl.h" + +namespace pulsar { + +class MockClientImpl : public ClientImpl { + public: + struct SyncOpResult { + Result result; + long timeMs; + }; + using PromisePtr = std::shared_ptr>; + MockClientImpl(const std::string& serviceUrl, ClientConfiguration conf = {}) + : ClientImpl(serviceUrl, conf) {} + + MOCK_METHOD((Future), getConnection, (const std::string&, size_t), + (override)); + + SyncOpResult createProducer(const std::string& topic) { + using namespace std::chrono; + auto start = high_resolution_clock::now(); + auto promise = createPromise(); + createProducerAsync(topic, {}, [&start, promise](Result result, Producer) { + auto timeMs = duration_cast(high_resolution_clock::now() - start).count(); + promise->set_value({result, timeMs}); + }); + return wait(promise); + } + + SyncOpResult subscribe(const std::string& topic) { + using namespace std::chrono; + auto start = std::chrono::high_resolution_clock::now(); + auto promise = createPromise(); + subscribeAsync(topic, "sub", {}, [&start, &promise](Result result, Consumer) { + auto timeMs = duration_cast(high_resolution_clock::now() - start).count(); + promise->set_value({result, timeMs}); + }); + return wait(promise); + } + + GetConnectionFuture getConnectionReal(const std::string& topic, size_t key) { + return ClientImpl::getConnection(topic, key); + } + + Result close() { + auto promise = createPromise(); + closeAsync([promise](Result result) { promise->set_value({result, 0L}); }); + return wait(promise).result; + } + + private: + static PromisePtr createPromise() { return std::make_shared>(); } + + static SyncOpResult wait(const PromisePtr& promise) { + using namespace std::chrono; + auto future = promise->get_future(); + auto status = future.wait_for(std::chrono::seconds(10)); + if (status == std::future_status::ready) { + return future.get(); + } else { + return {ResultUnknownError, -1L}; + } + } +}; + +} // namespace pulsar