From 270d36c29e525deccd9d9913300f271450f3032e Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 16 Nov 2023 11:49:20 +0800 Subject: [PATCH] Fix crash when removing connection from the pool Fixes https://github.com/apache/pulsar-client-cpp/issues/346 ### Motivation https://github.com/apache/pulsar-client-cpp/pull/336 changes the key of the `ClientConnection` in `ConnectionPool`, while in `ClientConnection::close`, it still passes the old key (logical address) to `ConnectionPool::remove`, which results in the connection could never be removed and destroyed until being deleted as a stale connection. What's worse, if the key does not exist, the iterator returned by `std::map::find` will still be dereferenced, which might cause crash in some platforms. See https://github.com/apache/pulsar-client-cpp/blob/8d32fd254e294d1fabba73aed70115a434b341ef/lib/ConnectionPool.cc#L122-L123 ### Modifications - Avoid dereferencing the iterator if it's invalid in `ConnectionPool::remove`. - Store the key suffix in `ClientConnection` and pass the correct key to `ConnectionPool::remove` in `ClientConnection::close` - Add `ClientTest.testConnectionClose` to verify `ClientConnection::close` can remove itself from the pool and the connection will be destroyed eventually. --- lib/ClientConnection.cc | 9 +++++---- lib/ClientConnection.h | 4 +++- lib/ConnectionPool.cc | 9 +++++---- tests/ClientTest.cc | 41 +++++++++++++++++++++++++++++++++++++++++ tests/PulsarFriend.h | 2 ++ 5 files changed, 56 insertions(+), 9 deletions(-) diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 8f42535c..fbcbe626 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -160,7 +160,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: ExecutorServicePtr executor, const ClientConfiguration& clientConfiguration, const AuthenticationPtr& authentication, const std::string& clientVersion, - ConnectionPool& pool) + ConnectionPool& pool, size_t poolIndex) : operationsTimeout_(seconds(clientConfiguration.getOperationTimeoutSeconds())), authentication_(authentication), serverProtocolVersion_(proto::ProtocolVersion_MIN), @@ -184,7 +184,8 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: consumerStatsRequestTimer_(executor_->createDeadlineTimer()), maxPendingLookupRequest_(clientConfiguration.getConcurrentLookupRequest()), clientVersion_(clientVersion), - pool_(pool) { + pool_(pool), + poolIndex_(poolIndex) { LOG_INFO(cnxString_ << "Create ClientConnection, timeout=" << clientConfiguration.getConnectionTimeout()); if (clientConfiguration.isUseTls()) { #if BOOST_VERSION >= 105400 @@ -265,7 +266,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: } ClientConnection::~ClientConnection() { - LOG_INFO(cnxString_ << "Destroyed connection to " << logicalAddress_); + LOG_INFO(cnxString_ << "Destroyed connection to " << logicalAddress_ << "-" << poolIndex_); } void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdConnected) { @@ -1320,7 +1321,7 @@ void ClientConnection::close(Result result, bool detach) { } // Remove the connection from the pool before completing any promise if (detach) { - pool_.remove(logicalAddress_, this); // trigger the destructor + pool_.remove(logicalAddress_ + "-" + std::to_string(poolIndex_), this); } auto self = shared_from_this(); diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 30ea8d82..965c6aec 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -129,7 +129,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this ConnectionPool::getConnectionAsync(const return cnx->getConnectFuture(); } else { // The closed connection should have been removed from the pool in ClientConnection::close - LOG_WARN("Deleting stale connection from pool for " << key << " use_count: " << (cnx.use_count()) + LOG_INFO("Deleting stale connection from pool for " << key << " use_count: " << (cnx.use_count()) << " @ " << cnx.get()); pool_.erase(key); } @@ -97,7 +97,8 @@ Future ConnectionPool::getConnectionAsync(const ClientConnectionPtr cnx; try { cnx.reset(new ClientConnection(logicalAddress, physicalAddress, executorProvider_->get(keySuffix), - clientConfiguration_, authentication_, clientVersion_, *this)); + clientConfiguration_, authentication_, clientVersion_, *this, + keySuffix)); } catch (const std::runtime_error& e) { lock.unlock(); LOG_ERROR("Failed to create connection: " << e.what()) @@ -106,7 +107,7 @@ Future ConnectionPool::getConnectionAsync(const return promise.getFuture(); } - LOG_INFO("Created connection for " << logicalAddress); + LOG_INFO("Created connection for " << key); Future future = cnx->getConnectFuture(); pool_.insert(std::make_pair(key, cnx)); @@ -120,7 +121,7 @@ Future ConnectionPool::getConnectionAsync(const void ConnectionPool::remove(const std::string& key, ClientConnection* value) { std::lock_guard lock(mutex_); auto it = pool_.find(key); - if (it->second.get() == value) { + if (it != pool_.end() && it->second.get() == value) { LOG_INFO("Remove connection for " << key); pool_.erase(it); } diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc index 983e5d91..5b4dd2e8 100644 --- a/tests/ClientTest.cc +++ b/tests/ClientTest.cc @@ -400,3 +400,44 @@ TEST(ClientTest, testClientVersion) { client.close(); } + +TEST(ClientTest, testConnectionClose) { + std::vector clients; + clients.emplace_back(lookupUrl); + clients.emplace_back(lookupUrl, ClientConfiguration().setConnectionsPerBroker(5)); + + const auto topic = "client-test-connection-close"; + for (auto &client : clients) { + auto testClose = [&client](ClientConnectionWeakPtr weakCnx) { + auto cnx = weakCnx.lock(); + ASSERT_TRUE(cnx); + + auto numConnections = PulsarFriend::getConnections(client).size(); + LOG_INFO("Connection refcnt: " << cnx.use_count() << " before close"); + auto executor = PulsarFriend::getExecutor(*cnx); + // Simulate the close() happens in the event loop + executor->postWork([cnx, &client, numConnections] { + cnx->close(); + ASSERT_EQ(PulsarFriend::getConnections(client).size(), numConnections - 1); + LOG_INFO("Connection refcnt: " << cnx.use_count() << " after close"); + }); + cnx.reset(); + + // The ClientConnection could still be referred in a socket callback, wait until all these + // callbacks being cancelled due to the socket close. + ASSERT_TRUE(waitUntil( + std::chrono::seconds(1), [weakCnx] { return weakCnx.expired(); }, 1)); + }; + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producer)); + testClose(PulsarFriend::getProducerImpl(producer).getCnx()); + producer.close(); + + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe("client-test-connection-close", "sub", consumer)); + testClose(PulsarFriend::getConsumerImpl(consumer).getCnx()); + consumer.close(); + + client.close(); + } +} diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h index de82ce47..c2863e8c 100644 --- a/tests/PulsarFriend.h +++ b/tests/PulsarFriend.h @@ -140,6 +140,8 @@ class PulsarFriend { return connections; } + static ExecutorServicePtr getExecutor(const ClientConnection& cnx) { return cnx.executor_; } + static std::vector getProducers(const ClientConnection& cnx) { std::vector producers; std::lock_guard lock(cnx.mutex_);