Skip to content

Commit

Permalink
Fix topic not shown correctly in the consumer string (apache#329)
Browse files Browse the repository at this point in the history
### Motivation

`ConsumerImpl::getName()` returns a string that is used in logs to
represent the consumer. However, the topic part does not show correctly:

```
ConsumerImpl:283 | [0x6000001c88b8, consumer-1, 0] Created consumer on broker [127.0.0.1:60399 -> 127.0.0.1:6650]
```

It's because after apache#218,
the `ConsumerImpl::topic_` field becomes `std::shared_ptr` rather than a
`std::string` but it is still used to construct the `consumerStr_`.

### Modifications

Construct the `consumerStr_` using the `topic` argument in the
constructor and make `consumerStr_` const because it is never changed.

Now the logs will be like:

```
ConsumerImpl:280 | [persistent://public/default/my-topic, consumer-1, 0] Created consumer on broker [127.0.0.1:60647 -> 127.0.0.1:6650]
```
  • Loading branch information
BewareMyPower authored Oct 16, 2023
1 parent 5c77648 commit 33085eb
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 7 deletions.
5 changes: 1 addition & 4 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
receiverQueueRefillThreshold_(config_.getReceiverQueueSize() / 2),
consumerId_(client->newConsumerId()),
consumerName_(config_.getConsumerName()),
consumerStr_("[" + topic + ", " + subscriptionName + ", " + std::to_string(consumerId_) + "] "),
messageListenerRunning_(true),
negativeAcksTracker_(client, *this, conf),
readCompacted_(conf.isReadCompacted()),
Expand All @@ -92,10 +93,6 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()),
expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()),
interceptors_(interceptors) {
std::stringstream consumerStrStream;
consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " << consumerId_ << "] ";
consumerStr_ = consumerStrStream.str();

// Initialize un-ACKed messages OT tracker.
if (conf.getUnAckedMessagesTimeoutMs() != 0) {
if (conf.getTickDurationInMs() > 0) {
Expand Down
2 changes: 1 addition & 1 deletion lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ class ConsumerImpl : public ConsumerImplBase {
const int receiverQueueRefillThreshold_;
uint64_t consumerId_;
std::string consumerName_;
std::string consumerStr_;
const std::string consumerStr_;
int32_t partitionIndex_ = -1;
Promise<Result, ConsumerImplBaseWeakPtr> consumerCreatedPromise_;
std::atomic_bool messageListenerRunning_;
Expand Down
2 changes: 1 addition & 1 deletion lib/MessageCrypto.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace pulsar {

DECLARE_LOG_OBJECT()

MessageCrypto::MessageCrypto(std::string& logCtx, bool keyGenNeeded)
MessageCrypto::MessageCrypto(const std::string& logCtx, bool keyGenNeeded)
: dataKeyLen_(32),
dataKey_(new unsigned char[dataKeyLen_]),
tagLen_(16),
Expand Down
2 changes: 1 addition & 1 deletion lib/MessageCrypto.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class MessageCrypto {
typedef std::map<std::string, std::string> StringMap;
typedef std::map<std::string, std::pair<std::string, boost::posix_time::ptime>> DataKeyCacheMap;

MessageCrypto(std::string& logCtx, bool keyGenNeeded);
MessageCrypto(const std::string& logCtx, bool keyGenNeeded);
~MessageCrypto();

/*
Expand Down

0 comments on commit 33085eb

Please sign in to comment.