Skip to content

Commit

Permalink
Fix crash when removing connection from the pool
Browse files Browse the repository at this point in the history
Fixes #346

### Motivation

#336 changes the key of
the `ClientConnection` in `ConnectionPool`, while in
`ClientConnection::close`, it still passes the old key (logical address)
to `ConnectionPool::remove`, which results in the connection could never
be removed and destroyed until being deleted as a stale connection.

What's worse, if the key does not exist, the iterator returned by
`std::map::find` will still be dereferenced, which might cause crash in
some platforms. See
https://github.com/apache/pulsar-client-cpp/blob/8d32fd254e294d1fabba73aed70115a434b341ef/lib/ConnectionPool.cc#L122-L123

### Modifications

- Avoid dereferencing the iterator if it's invalid in
  `ConnectionPool::remove`.
- Store the key suffix in `ClientConnection` and pass the correct key to
  `ConnectionPool::remove` in `ClientConnection::close`
- Add `ClientTest.testConnectionClose` to verify
  `ClientConnection::close` can remove itself from the pool and the
  connection will be destroyed eventually.
  • Loading branch information
BewareMyPower committed Nov 16, 2023
1 parent 8d32fd2 commit 270d36c
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 9 deletions.
9 changes: 5 additions & 4 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
ExecutorServicePtr executor,
const ClientConfiguration& clientConfiguration,
const AuthenticationPtr& authentication, const std::string& clientVersion,
ConnectionPool& pool)
ConnectionPool& pool, size_t poolIndex)
: operationsTimeout_(seconds(clientConfiguration.getOperationTimeoutSeconds())),
authentication_(authentication),
serverProtocolVersion_(proto::ProtocolVersion_MIN),
Expand All @@ -184,7 +184,8 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
consumerStatsRequestTimer_(executor_->createDeadlineTimer()),
maxPendingLookupRequest_(clientConfiguration.getConcurrentLookupRequest()),
clientVersion_(clientVersion),
pool_(pool) {
pool_(pool),
poolIndex_(poolIndex) {
LOG_INFO(cnxString_ << "Create ClientConnection, timeout=" << clientConfiguration.getConnectionTimeout());
if (clientConfiguration.isUseTls()) {
#if BOOST_VERSION >= 105400
Expand Down Expand Up @@ -265,7 +266,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
}

ClientConnection::~ClientConnection() {
LOG_INFO(cnxString_ << "Destroyed connection to " << logicalAddress_);
LOG_INFO(cnxString_ << "Destroyed connection to " << logicalAddress_ << "-" << poolIndex_);
}

void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdConnected) {
Expand Down Expand Up @@ -1320,7 +1321,7 @@ void ClientConnection::close(Result result, bool detach) {
}
// Remove the connection from the pool before completing any promise
if (detach) {
pool_.remove(logicalAddress_, this); // trigger the destructor
pool_.remove(logicalAddress_ + "-" + std::to_string(poolIndex_), this);
}

auto self = shared_from_this();
Expand Down
4 changes: 3 additions & 1 deletion lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
ClientConnection(const std::string& logicalAddress, const std::string& physicalAddress,
ExecutorServicePtr executor, const ClientConfiguration& clientConfiguration,
const AuthenticationPtr& authentication, const std::string& clientVersion,
ConnectionPool& pool);
ConnectionPool& pool, size_t poolIndex);
~ClientConnection();

#if __cplusplus < 201703L
Expand Down Expand Up @@ -400,6 +400,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien

const std::string clientVersion_;
ConnectionPool& pool_;
const size_t poolIndex_;

friend class PulsarFriend;

void checkServerError(ServerError error);
Expand Down
9 changes: 5 additions & 4 deletions lib/ConnectionPool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const
return cnx->getConnectFuture();
} else {
// The closed connection should have been removed from the pool in ClientConnection::close
LOG_WARN("Deleting stale connection from pool for " << key << " use_count: " << (cnx.use_count())
LOG_INFO("Deleting stale connection from pool for " << key << " use_count: " << (cnx.use_count())
<< " @ " << cnx.get());
pool_.erase(key);
}
Expand All @@ -97,7 +97,8 @@ Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const
ClientConnectionPtr cnx;
try {
cnx.reset(new ClientConnection(logicalAddress, physicalAddress, executorProvider_->get(keySuffix),
clientConfiguration_, authentication_, clientVersion_, *this));
clientConfiguration_, authentication_, clientVersion_, *this,
keySuffix));
} catch (const std::runtime_error& e) {
lock.unlock();
LOG_ERROR("Failed to create connection: " << e.what())
Expand All @@ -106,7 +107,7 @@ Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const
return promise.getFuture();
}

LOG_INFO("Created connection for " << logicalAddress);
LOG_INFO("Created connection for " << key);

Future<Result, ClientConnectionWeakPtr> future = cnx->getConnectFuture();
pool_.insert(std::make_pair(key, cnx));
Expand All @@ -120,7 +121,7 @@ Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const
void ConnectionPool::remove(const std::string& key, ClientConnection* value) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
auto it = pool_.find(key);
if (it->second.get() == value) {
if (it != pool_.end() && it->second.get() == value) {
LOG_INFO("Remove connection for " << key);
pool_.erase(it);
}
Expand Down
41 changes: 41 additions & 0 deletions tests/ClientTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -400,3 +400,44 @@ TEST(ClientTest, testClientVersion) {

client.close();
}

TEST(ClientTest, testConnectionClose) {
std::vector<Client> clients;
clients.emplace_back(lookupUrl);
clients.emplace_back(lookupUrl, ClientConfiguration().setConnectionsPerBroker(5));

const auto topic = "client-test-connection-close";
for (auto &client : clients) {
auto testClose = [&client](ClientConnectionWeakPtr weakCnx) {
auto cnx = weakCnx.lock();
ASSERT_TRUE(cnx);

auto numConnections = PulsarFriend::getConnections(client).size();
LOG_INFO("Connection refcnt: " << cnx.use_count() << " before close");
auto executor = PulsarFriend::getExecutor(*cnx);
// Simulate the close() happens in the event loop
executor->postWork([cnx, &client, numConnections] {
cnx->close();
ASSERT_EQ(PulsarFriend::getConnections(client).size(), numConnections - 1);
LOG_INFO("Connection refcnt: " << cnx.use_count() << " after close");
});
cnx.reset();

// The ClientConnection could still be referred in a socket callback, wait until all these
// callbacks being cancelled due to the socket close.
ASSERT_TRUE(waitUntil(
std::chrono::seconds(1), [weakCnx] { return weakCnx.expired(); }, 1));
};
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
testClose(PulsarFriend::getProducerImpl(producer).getCnx());
producer.close();

Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe("client-test-connection-close", "sub", consumer));
testClose(PulsarFriend::getConsumerImpl(consumer).getCnx());
consumer.close();

client.close();
}
}
2 changes: 2 additions & 0 deletions tests/PulsarFriend.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ class PulsarFriend {
return connections;
}

static ExecutorServicePtr getExecutor(const ClientConnection& cnx) { return cnx.executor_; }

static std::vector<ProducerImplPtr> getProducers(const ClientConnection& cnx) {
std::vector<ProducerImplPtr> producers;
std::lock_guard<std::mutex> lock(cnx.mutex_);
Expand Down

0 comments on commit 270d36c

Please sign in to comment.