Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Memory leak during GET_LAST_MESSAGE_ID command processing. #301

Merged
merged 3 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 25 additions & 17 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1142,6 +1142,13 @@ void ClientConnection::handleLookupTimeout(const boost::system::error_code& ec,
}
}

void ClientConnection::handleGetLastMessageIdTimeout(const boost::system::error_code& ec,
ClientConnection::LastMessageIdRequestData data) {
if (!ec) {
data.promise->setFailed(ResultTimeout);
}
}

void ClientConnection::handleKeepAliveTimeout() {
if (isClosed()) {
return;
Expand Down Expand Up @@ -1251,7 +1258,7 @@ void ClientConnection::close(Result result) {
kv.second.setFailed(result);
}
for (auto& kv : pendingGetLastMessageIdRequests) {
kv.second.setFailed(result);
kv.second.promise->setFailed(result);
}
for (auto& kv : pendingGetNamespaceTopicsRequests) {
kv.second.setFailed(result);
Expand Down Expand Up @@ -1299,23 +1306,24 @@ Commands::ChecksumType ClientConnection::getChecksumType() const {
Future<Result, GetLastMessageIdResponse> ClientConnection::newGetLastMessageId(uint64_t consumerId,
uint64_t requestId) {
Lock lock(mutex_);
Promise<Result, GetLastMessageIdResponse> promise;
auto promise = std::make_shared<GetLastMessageIdResponsePromisePtr::element_type>();
if (isClosed()) {
lock.unlock();
LOG_ERROR(cnxString_ << " Client is not connected to the broker");
promise.setFailed(ResultNotConnected);
return promise.getFuture();
promise->setFailed(ResultNotConnected);
return promise->getFuture();
}

pendingGetLastMessageIdRequests_.insert(std::make_pair(requestId, promise));
LastMessageIdRequestData requestData;
requestData.promise = promise;
requestData.timer = executor_->createDeadlineTimer();
requestData.timer->expires_from_now(operationsTimeout_);
requestData.timer->async_wait(std::bind(&ClientConnection::handleGetLastMessageIdTimeout,
shared_from_this(), std::placeholders::_1, requestData));
pendingGetLastMessageIdRequests_.insert(std::make_pair(requestId, requestData));
lock.unlock();
sendRequestWithId(Commands::newGetLastMessageId(consumerId, requestId), requestId)
.addListener([promise](Result result, const ResponseData& data) {
if (result != ResultOk) {
promise.setFailed(result);
}
});
return promise.getFuture();
sendCommand(Commands::newGetLastMessageId(consumerId, requestId));
return promise->getFuture();
}

Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(
Expand Down Expand Up @@ -1635,11 +1643,11 @@ void ClientConnection::handleError(const proto::CommandError& error) {
PendingGetLastMessageIdRequestsMap::iterator it =
pendingGetLastMessageIdRequests_.find(error.request_id());
if (it != pendingGetLastMessageIdRequests_.end()) {
auto getLastMessageIdPromise = it->second;
auto getLastMessageIdPromise = it->second.promise;
pendingGetLastMessageIdRequests_.erase(it);
lock.unlock();

getLastMessageIdPromise.setFailed(result);
getLastMessageIdPromise->setFailed(result);
} else {
PendingGetNamespaceTopicsMap::iterator it =
pendingGetNamespaceTopicsRequests_.find(error.request_id());
Expand Down Expand Up @@ -1719,16 +1727,16 @@ void ClientConnection::handleGetLastMessageIdResponse(
auto it = pendingGetLastMessageIdRequests_.find(getLastMessageIdResponse.request_id());

if (it != pendingGetLastMessageIdRequests_.end()) {
auto getLastMessageIdPromise = it->second;
auto getLastMessageIdPromise = it->second.promise;
pendingGetLastMessageIdRequests_.erase(it);
lock.unlock();

if (getLastMessageIdResponse.has_consumer_mark_delete_position()) {
getLastMessageIdPromise.setValue(
getLastMessageIdPromise->setValue(
{toMessageId(getLastMessageIdResponse.last_message_id()),
toMessageId(getLastMessageIdResponse.consumer_mark_delete_position())});
} else {
getLastMessageIdPromise.setValue({toMessageId(getLastMessageIdResponse.last_message_id())});
getLastMessageIdPromise->setValue({toMessageId(getLastMessageIdResponse.last_message_id())});
}
} else {
lock.unlock();
Expand Down
9 changes: 8 additions & 1 deletion lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
DeadlineTimerPtr timer;
};

struct LastMessageIdRequestData {
GetLastMessageIdResponsePromisePtr promise;
DeadlineTimerPtr timer;
};

/*
* handler for connectAsync
* creates a ConnectionPtr which has a valid ClientConnection object
Expand Down Expand Up @@ -243,6 +248,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien

void handleLookupTimeout(const boost::system::error_code&, LookupRequestData);

void handleGetLastMessageIdTimeout(const boost::system::error_code&, LastMessageIdRequestData data);

void handleKeepAliveTimeout();

template <typename Handler>
Expand Down Expand Up @@ -342,7 +349,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
typedef std::map<uint64_t, Promise<Result, BrokerConsumerStatsImpl>> PendingConsumerStatsMap;
PendingConsumerStatsMap pendingConsumerStatsMap_;

typedef std::map<long, Promise<Result, GetLastMessageIdResponse>> PendingGetLastMessageIdRequestsMap;
typedef std::map<long, LastMessageIdRequestData> PendingGetLastMessageIdRequestsMap;
PendingGetLastMessageIdRequestsMap pendingGetLastMessageIdRequests_;

typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> PendingGetNamespaceTopicsMap;
Expand Down
7 changes: 6 additions & 1 deletion lib/GetLastMessageIdResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@

#include <pulsar/MessageId.h>
#include <pulsar/Result.h>
#include "Future.h"

#include <iostream>
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved

namespace pulsar {

class GetLastMessageIdResponse;
typedef Promise<Result, GetLastMessageIdResponse> GetLastMessageIdResponsePromise;
typedef std::shared_ptr<GetLastMessageIdResponsePromise> GetLastMessageIdResponsePromisePtr;

class GetLastMessageIdResponse {
friend std::ostream& operator<<(std::ostream& os, const GetLastMessageIdResponse& response) {
os << "lastMessageId: " << response.lastMessageId_;
Expand Down Expand Up @@ -52,7 +57,7 @@ class GetLastMessageIdResponse {
private:
MessageId lastMessageId_;
MessageId markDeletePosition_;
bool hasMarkDeletePosition_;
bool hasMarkDeletePosition_ = false;
};

typedef std::function<void(Result, const GetLastMessageIdResponse&)> BrokerGetLastMessageIdCallback;
Expand Down