-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cpp client: add multiTopicsConsumer #1996
Conversation
@merlimat @ivankelly can you review this? we need this in 2.1 |
Then we should also have the Python, C and Go versions |
I guess this will take extra weeks to get those completed. Are you suggesting dropping this for 2.1 release? |
@@ -99,6 +99,15 @@ class Client { | |||
void subscribeAsync(const std::string& topic, const std::string& consumerName, | |||
const ConsumerConfiguration& conf, SubscribeCallback callback); | |||
|
|||
Result subscribe(const std::vector<std::string>& topics, const std::string& consumerName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consumerName
-> subscriptionName
/** | ||
* Only for MultiTopicsConsumer to set a valid topicName | ||
*/ | ||
void setTopicName(std::string topicName) const; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be exposed in API.
/** | ||
* Only for MultiTopicsConsumer to get a valid topicName | ||
*/ | ||
std::string& getTopicName() const; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return type should be const std::string&
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a single topic, we should have the information as well, so we should restrict this method only for the MultiTopicsConsumer
case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @merlimat. Currently, as you suggested, move set/getTopicName into private. Is it enough? e.g. single ConsumerImpl
could call these 2 methods, but It is not useful at all, maybe no developer will use it inside ComsumerImpl
.
Or we could use this way to make set/getTopicName only available to MultiTopicsConsumer?
/** | ||
* Only for MultiTopicsConsumer to set a valid topicName | ||
*/ | ||
void setTopicName(std::string topicName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be in public API
/** | ||
* Only for MultiTopicsConsumer to get a valid topicName | ||
*/ | ||
std::string& getTopicName() const; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't expose this in public API for MessageId
. It's useful for Message
but, unless strictly needed, I'd leave it out from MessasgeId
interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, Seems need it. in acknowledgeAsync, we use getTopicName(), then use the topicName to find which sub-consumer to do the ack. Also, in UnAckedMessageTracker to remove MessageIds that belongs to a sub-consumer.
will move this into private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I understand that we need it internally, but we should be careful to expose in API. This method could only be exposed in MessageIdImpl
|
||
class MessageIdImpl { | ||
public: | ||
MessageIdImpl() : ledgerId_(-1), entryId_(-1), partition_(-1), batchIndex_(-1) {} | ||
MessageIdImpl() : ledgerId_(-1), entryId_(-1), partition_(-1), batchIndex_(-1), topicName_(emptyString) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to use emptyString
. initializing with topicName_()
will have same effect
pulsar-client-cpp/lib/MessageImpl.cc
Outdated
|
||
MessageImpl::MessageImpl() : metadata(), payload(), messageId(), cnx_(0) {} | ||
MessageImpl::MessageImpl() : metadata(), payload(), messageId(), cnx_(0), topicName_(emptyString) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, no need for emptyString
Most users will access this feature through either Python (or Go) bindings, rather than using straight C++ API. I think that it would be fine to plan for 2.2 and include bindings as well. |
okay. moved to 2.2 |
64b5d06
to
7e43452
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't gone through thoroughly. I'll give it another pass tomorrow.
pulsar-client-cpp/lib/ClientImpl.cc
Outdated
lock.unlock(); | ||
callback(ResultAlreadyClosed, Consumer()); | ||
return; | ||
} else if (!topics.empty() && !(topicNamePtr = MultiTopicsConsumerImpl::topicNamesValid(topics))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assignment is very hidden. I would move the call to topicNamesValid to just after the lock.
|
||
DECLARE_LOG_OBJECT() | ||
|
||
namespace pulsar { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using namespace pulsar;
Will avoid the braces. applies for whole file. Don't do it in a header though.
const LookupServicePtr lookupServicePtr) | ||
: client_(client), | ||
subscriptionName_(subscriptionName), | ||
topic_(topicName ? topicName->toString() : "EmptyTopics"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You've already got special handling to avoid empty topicName in another place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, no, not handling it in clientimpl. It is possible we subscribe on empty topics when pattern subscribe added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
void MultiTopicsConsumerImpl::start() { | ||
if (topics_.empty()) { | ||
setState(Ready); | ||
LOG_INFO("No topics passed in when create MultiTopicsConsumer."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
INFO may be a bit loud for this.
} | ||
} | ||
|
||
void MultiTopicsConsumerImpl::handleOneTopicSubscribe(Result result, Consumer consumer, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a callback, but the name doesn't indicate it.
handleOneTopicSubscribe -> handleOneTopicSubscribed
lock.unlock(); | ||
allTopicPartitionsNumber_->fetch_add(partitionsNumber); | ||
|
||
boost::shared_ptr<std::atomic<int>> atomicIntPtr = boost::make_shared<std::atomic<int>>(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
atomicIntPtr says nothing about what this is for. Also, you can just initialize with the number of partition, and decrement each time. This would avoid you having to pass partitionsNumber to the callback
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same for the similar patternin topics.
} | ||
} | ||
|
||
void MultiTopicsConsumerImpl::handleUnsubscribeAsync(Result result, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-> handleUnsubscribed
} | ||
} | ||
|
||
void MultiTopicsConsumerImpl::handleUnsubscribeOneTopicAsync( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-> handleOneTopicUnsubscribed
a55dd4e
to
7993d52
Compare
@ivankelly, Thanks for the comments, change it accordingly. |
#include <algorithm> | ||
#include <numeric> | ||
|
||
namespace pulsar { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be changed to using namespace pulsar
also
pulsar-client-cpp/lib/MessageImpl.h
Outdated
|
||
const std::string& getPartitionKey() const; | ||
bool hasPartitionKey() const; | ||
|
||
uint64_t getPublishTimestamp() const; | ||
uint64_t getEventTimestamp() const; | ||
|
||
/** | ||
* Only for MultiTopicsConsumer to get a valid topicName |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no reason this can't work for non-multitopic consumers also. In fact, there was a request for this from a user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, right, I found that request. cpp work tracked in issue #2190
bool MultiTopicsBrokerConsumerStatsImpl::isValid() const { | ||
bool isValid = true; | ||
for (int i = 0; i < statsList_.size(); i++) { | ||
isValid &= statsList_[i].isValid(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while technically this will work, it's strange to use a bitwise operator here.
boolean operators should be used with bools.
isValid = isValid && statsLists_[i].isValid();
return false; | ||
} | ||
|
||
bool isValid = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate code with isValid function above.
virtual uint64_t getMsgBacklog() const; | ||
|
||
/** Returns the BrokerConsumerStatsImpl at of ith partition */ | ||
BrokerConsumerStats getBrokerConsumerStats(int index); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the functions belong and including this one be private?
assert(previous > 0); | ||
|
||
if (result != ResultOk) { | ||
state_ = Failed; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
state shouldn't be set here. It will be set when the failure on the promise propagates up.
|
||
if (partitionsNeedCreate->load() == 0) { | ||
topicSubResultPromise->setValue(Consumer(shared_from_this())); | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dead code.
<< result << " subscription - " << subscriptionName_); | ||
} | ||
|
||
if (consumerUnsubed->load() == allTopicPartitionsNumber_->load()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename allTopicPartitionsNumber_ to numTopicPartitions_
// subscribe for passed in topic | ||
Future<Result, Consumer> MultiTopicsConsumerImpl::subscribeOneTopicAsync(const std::string& topic) { | ||
TopicNamePtr topicName; | ||
ConsumerSubResultPromisePtr topicPromise = boost::make_shared<Promise<Result, Consumer>>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't there be a check that the topic is in the same namespace as preexisting topics?
std::string url3 = | ||
adminUrl + "admin/persistent/prop/unit/ns3/testMultiTopicsConsumerDifferentNamespace3/partitions"; | ||
|
||
int res = makePutRequest(url1, "2"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In form these tests are verify similar to the integration tests (i.e. Pulsar is running somewhere else). They should be able to use the commandline for doing stuff like this. Not something for this patch, but something to think about.
@@ -22,13 +22,12 @@ | |||
#include <iosfwd> | |||
#include <stdint.h> | |||
#include <boost/shared_ptr.hpp> | |||
#include <lib/MessageIdImpl.h> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot include lib/*.h
headers from public API. We need to just do the forward declaration
pulsar-client-cpp/lib/ClientImpl.cc
Outdated
|
||
consumer->getConsumerCreatedFuture().addListener( | ||
boost::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), _1, _2, callback, consumer)); | ||
Lock lock(mutex_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we're acquiring the mutex above as well, I'd rather have a single continuous section with the mutex
pulsar-client-cpp/lib/MessageImpl.cc
Outdated
@@ -78,4 +78,12 @@ void MessageImpl::setPartitionKey(const std::string& partitionKey) { | |||
} | |||
|
|||
void MessageImpl::setEventTimestamp(uint64_t eventTimestamp) { metadata.set_event_time(eventTimestamp); } | |||
|
|||
void MessageImpl::setTopicName(std::string topicName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const std::string&
pulsar-client-cpp/lib/MessageImpl.h
Outdated
@@ -43,13 +43,24 @@ class MessageImpl { | |||
SharedBuffer payload; | |||
MessageId messageId; | |||
ClientConnection* cnx_; | |||
std::string topicName_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As an optimization, to avoid to create (and copy) the topic string object for each message id, we could use a std::string*
to store the name. This would work if we pass a reference of the topic name held by the consumer instance, which will be valid.
The getTopicName()
would still have the same signature.
return isValid; | ||
} | ||
|
||
const std::string MultiTopicsBrokerConsumerStatsImpl::getAddress() const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove const
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. @merlimat, the const in const std::string
, or const {)
? not quite understand the reason of remove const?
return str; | ||
} | ||
|
||
const ConsumerType MultiTopicsBrokerConsumerStatsImpl::getType() const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove const
} | ||
|
||
const std::string MultiTopicsBrokerConsumerStatsImpl::getAddress() const { | ||
std::string str; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of concatenating a string with +=
, we should use std::stringstream
pulsar-client-cpp/lib/TopicName.cc
Outdated
@@ -224,4 +224,7 @@ const std::string TopicName::getTopicPartitionName(unsigned int partition) { | |||
topicPartitionName << toString() << PartitionedProducerImpl::PARTITION_NAME_SUFFIX << partition; | |||
return topicPartitionName.str(); | |||
} | |||
|
|||
boost::shared_ptr<NamespaceName> TopicName::getNamespaceName() { return namespaceName_; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NamespaceNamePtr
7993d52
to
f53c6cf
Compare
@merlimat @ivankelly Thanks for the comments, It is updated, please help review it. |
retest this please for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm #shipit
e1cf0cf
to
c46e395
Compare
retest this please |
In PR #1103, we add Multi-Topics-Consumer in java client. This is a catch up work to add it in cpp client.
merged as 500e393 in branch-2.1 |
In PR #1103, we add Multi-Topics-Consumer in java client. This is a catch up work to add it in cpp client.