Skip to content

Commit

Permalink
Fix topic name is shown as a pointer rather than string (#331)
Browse files Browse the repository at this point in the history
### Motivation

This is an additional fix to #329 because I still observed logs like:

```
Closing consumer for topic 0x6000028e0648
Closing producer for topic 0x600001210b88
```

It's because `HandlerBase::topic_` field is protected and could be
accessed directly from the derived classes.

### Motivation

In `HandlerBase`, make `topic_` private and add two methods `topic()`
and `getTopicPtr()` to get the reference to the string and the shared
pointer. `getTopicPtr()` should only be called when being passed to
`MessageImpl::setTopicName`.
  • Loading branch information
BewareMyPower authored Oct 20, 2023
1 parent a0f2d32 commit 7cefe0e
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 25 deletions.
2 changes: 1 addition & 1 deletion lib/BatchMessageContainerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
namespace pulsar {

BatchMessageContainerBase::BatchMessageContainerBase(const ProducerImpl& producer)
: topicName_(producer.topic_),
: topicName_(producer.topic()),
producerConfig_(producer.conf_),
producerName_(producer.producerName_),
producerId_(producer.producerId_),
Expand Down
2 changes: 1 addition & 1 deletion lib/BatchMessageContainerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class BatchMessageContainerBase : public boost::noncopyable {

protected:
// references to ProducerImpl's fields
const std::shared_ptr<std::string> topicName_;
const std::string topicName_;
const ProducerConfiguration& producerConfig_;
const std::string& producerName_;
const uint64_t& producerId_;
Expand Down
14 changes: 7 additions & 7 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ Future<Result, ConsumerImplBaseWeakPtr> ConsumerImpl::getConsumerCreatedFuture()

const std::string& ConsumerImpl::getSubscriptionName() const { return originalSubscriptionName_; }

const std::string& ConsumerImpl::getTopic() const { return *topic_; }
const std::string& ConsumerImpl::getTopic() const { return topic(); }

void ConsumerImpl::start() {
HandlerBase::start();
Expand All @@ -194,7 +194,7 @@ void ConsumerImpl::start() {

// Initialize ackGroupingTrackerPtr_ here because the get_shared_this_ptr() was not initialized until the
// constructor completed.
if (TopicName::get(*topic_)->isPersistent()) {
if (TopicName::get(topic())->isPersistent()) {
if (config_.getAckGroupingTimeMs() > 0) {
ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled(
connectionSupplier, requestIdSupplier, consumerId_, config_.isAckReceiptEnabled(),
Expand Down Expand Up @@ -249,7 +249,7 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& c
ClientImplPtr client = client_.lock();
uint64_t requestId = client->newRequestId();
SharedBuffer cmd = Commands::newSubscribe(
*topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_,
topic(), subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_,
subscribeMessageId, readCompacted_, config_.getProperties(), config_.getSubscriptionProperties(),
config_.getSchema(), getInitialPosition(), config_.isReplicateSubscriptionStateEnabled(),
config_.getKeySharedPolicy(), config_.getPriorityLevel());
Expand Down Expand Up @@ -552,7 +552,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::

Message m(messageId, brokerEntryMetadata, metadata, payload);
m.impl_->cnx_ = cnx.get();
m.impl_->setTopicName(topic_);
m.impl_->setTopicName(getTopicPtr());
m.impl_->setRedeliveryCount(msg.redelivery_count());

if (metadata.has_schema_version()) {
Expand Down Expand Up @@ -1243,7 +1243,7 @@ void ConsumerImpl::closeAsync(ResultCallback originalCallback) {
return;
}

LOG_INFO(getName() << "Closing consumer for topic " << topic_);
LOG_INFO(getName() << "Closing consumer for topic " << topic());
state_ = Closing;
incomingMessages_.close();

Expand Down Expand Up @@ -1764,7 +1764,7 @@ void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, ProcessDLQCa
return;
}
if (result != ResultOk) {
LOG_WARN("{" << self->topic_ << "} {" << self->subscription_ << "} {"
LOG_WARN("{" << self->topic() << "} {" << self->subscription_ << "} {"
<< self->consumerName_ << "} Failed to acknowledge the message {"
<< originMessageId
<< "} of the original topic but send to the DLQ successfully : "
Expand All @@ -1777,7 +1777,7 @@ void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, ProcessDLQCa
}
});
} else {
LOG_WARN("{" << self->topic_ << "} {" << self->subscription_ << "} {"
LOG_WARN("{" << self->topic() << "} {" << self->subscription_ << "} {"
<< self->consumerName_ << "} Failed to send DLQ message to {"
<< self->deadLetterPolicy_.getDeadLetterTopic() << "} for message id "
<< "{" << originMessageId << "} : " << res);
Expand Down
6 changes: 3 additions & 3 deletions lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ DECLARE_LOG_OBJECT()
namespace pulsar {

HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, const Backoff& backoff)
: client_(client),
topic_(std::make_shared<std::string>(topic)),
: topic_(std::make_shared<std::string>(topic)),
client_(client),
executor_(client->getIOExecutorProvider()->get()),
mutex_(),
creationTimestamp_(TimeUtils::now()),
Expand Down Expand Up @@ -88,7 +88,7 @@ void HandlerBase::grabCnx() {
return;
}
auto self = shared_from_this();
client->getConnection(*topic_).addListener([this, self](Result result, const ClientConnectionPtr& cnx) {
client->getConnection(topic()).addListener([this, self](Result result, const ClientConnectionPtr& cnx) {
if (result == ResultOk) {
LOG_DEBUG(getName() << "Connected to broker: " << cnx->cnxString());
connectionOpened(cnx).addListener([this, self](Result result, bool) {
Expand Down
6 changes: 5 additions & 1 deletion lib/HandlerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,18 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> {

virtual const std::string& getName() const = 0;

const std::string& topic() const { return *topic_; }
const std::shared_ptr<std::string>& getTopicPtr() const { return topic_; }

private:
const std::shared_ptr<std::string> topic_;

void handleDisconnection(Result result, const ClientConnectionPtr& cnx);

void handleTimeout(const boost::system::error_code& ec);

protected:
ClientImplWeakPtr client_;
const std::shared_ptr<std::string> topic_;
ExecutorServicePtr executor_;
mutable std::mutex mutex_;
std::mutex pendingReceiveMutex_;
Expand Down
10 changes: 5 additions & 5 deletions lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
interceptors_(interceptors) {
std::stringstream consumerStrStream;
consumerStrStream << "[Muti Topics Consumer: "
<< "TopicName - " << topic_ << " - Subscription - " << subscriptionName << "]";
<< "TopicName - " << topic() << " - Subscription - " << subscriptionName << "]";
consumerStr_ = consumerStrStream.str();

if (conf.getUnAckedMessagesTimeoutMs() != 0) {
Expand Down Expand Up @@ -312,7 +312,7 @@ void MultiTopicsConsumerImpl::handleSingleConsumerCreated(
}

void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
LOG_INFO("[ Topics Consumer " << topic_ << "," << subscriptionName_ << "] Unsubscribing");
LOG_INFO("[ Topics Consumer " << topic() << "," << subscriptionName_ << "] Unsubscribing");

auto callback = [this, originalCallback](Result result) {
if (result == ResultOk) {
Expand Down Expand Up @@ -483,7 +483,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback originalCallback) {
*numberTopicPartitions_ = 0;
if (consumers.empty()) {
LOG_DEBUG("TopicsConsumer have no consumers to close "
<< " topic" << topic_ << " subscription - " << subscriptionName_);
<< " topic" << topic() << " subscription - " << subscriptionName_);
callback(ResultAlreadyClosed);
return;
}
Expand Down Expand Up @@ -518,7 +518,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback originalCallback) {
void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& msg) {
LOG_DEBUG("Received Message from one of the topic - " << consumer.getTopic()
<< " message:" << msg.getDataAsString());
msg.impl_->setTopicName(consumer.impl_->topic_);
msg.impl_->setTopicName(consumer.impl_->getTopicPtr());

Lock lock(pendingReceiveMutex_);
if (!pendingReceives_.empty()) {
Expand Down Expand Up @@ -744,7 +744,7 @@ Future<Result, ConsumerImplBaseWeakPtr> MultiTopicsConsumerImpl::getConsumerCrea
}
const std::string& MultiTopicsConsumerImpl::getSubscriptionName() const { return subscriptionName_; }

const std::string& MultiTopicsConsumerImpl::getTopic() const { return *topic_; }
const std::string& MultiTopicsConsumerImpl::getTopic() const { return topic(); }

const std::string& MultiTopicsConsumerImpl::getName() const { return consumerStr_; }

Expand Down
14 changes: 7 additions & 7 deletions lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
partition_(partition),
producerName_(conf_.getProducerName()),
userProvidedProducerName_(false),
producerStr_("[" + *topic_ + ", " + producerName_ + "] "),
producerStr_("[" + topic() + ", " + producerName_ + "] "),
producerId_(client->newProducerId()),
msgSequenceGenerator_(0),
batchTimer_(executor_->createDeadlineTimer()),
Expand All @@ -67,7 +67,7 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
memoryLimitController_(client->getMemoryLimitController()),
chunkingEnabled_(conf_.isChunkingEnabled() && topicName.isPersistent() && !conf_.getBatchingEnabled()),
interceptors_(interceptors) {
LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic_
LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic()
<< " id: " << producerId_);

int64_t initialSequenceId = conf.getInitialSequenceId();
Expand All @@ -93,7 +93,7 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,

if (conf_.isEncryptionEnabled()) {
std::ostringstream logCtxStream;
logCtxStream << "[" << topic_ << ", " << producerName_ << ", " << producerId_ << "]";
logCtxStream << "[" << topic() << ", " << producerName_ << ", " << producerId_ << "]";
std::string logCtx = logCtxStream.str();
msgCrypto_ = std::make_shared<MessageCrypto>(logCtx, true);
msgCrypto_->addPublicKeyCipher(conf_.getEncryptionKeys(), conf_.getCryptoKeyReader());
Expand Down Expand Up @@ -123,7 +123,7 @@ ProducerImpl::~ProducerImpl() {
}
}

const std::string& ProducerImpl::getTopic() const { return *topic_; }
const std::string& ProducerImpl::getTopic() const { return topic(); }

const std::string& ProducerImpl::getProducerName() const { return producerName_; }

Expand All @@ -148,7 +148,7 @@ Future<Result, bool> ProducerImpl::connectionOpened(const ClientConnectionPtr& c
ClientImplPtr client = client_.lock();
int requestId = client->newRequestId();

SharedBuffer cmd = Commands::newProducer(*topic_, producerId_, producerName_, requestId,
SharedBuffer cmd = Commands::newProducer(topic(), producerId_, producerName_, requestId,
conf_.getProperties(), conf_.getSchema(), epoch_,
userProvidedProducerName_, conf_.isEncryptionEnabled(),
static_cast<proto::ProducerAccessMode>(conf_.getAccessMode()),
Expand Down Expand Up @@ -218,7 +218,7 @@ Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result
cnx->registerProducer(producerId_, shared_from_this());
producerName_ = responseData.producerName;
schemaVersion_ = responseData.schemaVersion;
producerStr_ = "[" + *topic_ + ", " + producerName_ + "] ";
producerStr_ = "[" + topic() + ", " + producerName_ + "] ";
topicEpoch = responseData.topicEpoch;

if (lastSequenceIdPublished_ == -1 && conf_.getInitialSequenceId() == -1) {
Expand Down Expand Up @@ -788,7 +788,7 @@ void ProducerImpl::closeAsync(CloseCallback originalCallback) {

return;
}
LOG_INFO(getName() << "Closing producer for topic " << topic_);
LOG_INFO(getName() << "Closing producer for topic " << topic());
state_ = Closing;

ClientConnectionPtr cnx = getCnx().lock();
Expand Down

0 comments on commit 7cefe0e

Please sign in to comment.