diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index a9c16536..5490551a 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -236,15 +236,10 @@ void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDat return; } - if (partitionMetadata->getPartitions() > 0) { - LOG_ERROR("Topic reader cannot be created on a partitioned topic: " << topicName->toString()); - callback(ResultOperationNotSupported, Reader()); - return; - } - - ReaderImplPtr reader = std::make_shared(shared_from_this(), topicName->toString(), conf, + ReaderImplPtr reader = std::make_shared(shared_from_this(), topicName->toString(), + partitionMetadata->getPartitions(), conf, getListenerExecutorProvider()->get(), callback); - ConsumerImplBasePtr consumer = reader->getConsumer().lock(); + ConsumerImplBasePtr consumer = reader->getConsumer(); auto self = shared_from_this(); reader->start(startMessageId, [this, self](const ConsumerImplBaseWeakPtr& weakConsumerPtr) { auto consumer = weakConsumerPtr.lock(); diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 6437472d..b9292c04 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -124,6 +124,7 @@ class ConsumerImpl : public ConsumerImplBase { void negativeAcknowledge(const MessageId& msgId) override; bool isConnected() const override; uint64_t getNumberOfConnectedConsumer() override; + void hasMessageAvailableAsync(HasMessageAvailableCallback callback) override; virtual void disconnectConsumer(); Result fetchSingleMessageFromBroker(Message& msg); @@ -133,7 +134,6 @@ class ConsumerImpl : public ConsumerImplBase { virtual void redeliverMessages(const std::set& messageIds); virtual bool isReadCompacted(); - virtual void hasMessageAvailableAsync(HasMessageAvailableCallback callback); void beforeConnectionChange(ClientConnection& cnx) override; protected: diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h index 9cf63a38..73c9161b 100644 --- a/lib/ConsumerImplBase.h +++ b/lib/ConsumerImplBase.h @@ -29,6 +29,7 @@ #include "HandlerBase.h" namespace pulsar { +typedef std::function HasMessageAvailableCallback; class ConsumerImplBase; using ConsumerImplBaseWeakPtr = std::weak_ptr; class OpBatchReceive { @@ -76,6 +77,7 @@ class ConsumerImplBase : public HandlerBase, public std::enable_shared_from_this virtual uint64_t getNumberOfConnectedConsumer() = 0; // overrided methods from HandlerBase virtual const std::string& getName() const override = 0; + virtual void hasMessageAvailableAsync(HasMessageAvailableCallback callback) = 0; protected: // overrided methods from HandlerBase diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index 7410b749..39da4a63 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -40,16 +40,20 @@ using namespace pulsar; MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName, int numPartitions, const std::string& subscriptionName, const ConsumerConfiguration& conf, - LookupServicePtr lookupServicePtr) + LookupServicePtr lookupServicePtr, + const Commands::SubscriptionMode subscriptionMode, + boost::optional startMessageId) : MultiTopicsConsumerImpl(client, {topicName->toString()}, subscriptionName, topicName, conf, - lookupServicePtr) { + lookupServicePtr, subscriptionMode, startMessageId) { topicsPartitions_[topicName->toString()] = numPartitions; } MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector& topics, const std::string& subscriptionName, TopicNamePtr topicName, const ConsumerConfiguration& conf, - LookupServicePtr lookupServicePtr) + LookupServicePtr lookupServicePtr, + const Commands::SubscriptionMode subscriptionMode, + boost::optional startMessageId) : ConsumerImplBase(client, topicName ? topicName->toString() : "EmptyTopics", Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf, client->getListenerExecutorProvider()->get()), @@ -60,7 +64,9 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std messageListener_(conf.getMessageListener()), lookupServicePtr_(lookupServicePtr), numberTopicPartitions_(std::make_shared>(0)), - topics_(topics) { + topics_(topics), + subscriptionMode_(subscriptionMode), + startMessageId_(startMessageId) { std::stringstream consumerStrStream; consumerStrStream << "[Muti Topics Consumer: " << "TopicName - " << topic_ << " - Subscription - " << subscriptionName << "]"; @@ -226,7 +232,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN // We don't have to add partition-n suffix consumer = std::make_shared(client, topicName->toString(), subscriptionName_, config, topicName->isPersistent(), internalListenerExecutor, true, - NonPartitioned); + NonPartitioned, subscriptionMode_, startMessageId_); consumer->getConsumerCreatedFuture().addListener(std::bind( &MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(), std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise)); @@ -239,7 +245,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN std::string topicPartitionName = topicName->getTopicPartitionName(i); consumer = std::make_shared(client, topicPartitionName, subscriptionName_, config, topicName->isPersistent(), internalListenerExecutor, - true, Partitioned); + true, Partitioned, subscriptionMode_, startMessageId_); consumer->getConsumerCreatedFuture().addListener(std::bind( &MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(), std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise)); @@ -686,7 +692,12 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdLis } void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) { - callback(ResultOperationNotSupported); + msgId.getTopicName(); + auto optConsumer = consumers_.find(msgId.getTopicName()); + if (optConsumer) { + unAckedMessageTrackerPtr_->removeMessagesTill(msgId); + optConsumer.value()->acknowledgeCumulativeAsync(msgId, callback); + } } void MultiTopicsConsumerImpl::negativeAcknowledge(const MessageId& msgId) { @@ -1047,3 +1058,35 @@ void MultiTopicsConsumerImpl::cancelTimers() noexcept { partitionsUpdateTimer_->cancel(ec); } } + +void MultiTopicsConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback) { + if (incomingMessagesSize_ > 0) { + callback(ResultOk, true); + return; + } + + auto hasMessageAvailable = std::make_shared>(); + auto needCallBack = std::make_shared>(consumers_.size()); + auto self = get_shared_this_ptr(); + + consumers_.forEachValue([self, needCallBack, callback, hasMessageAvailable](ConsumerImplPtr consumer) { + consumer->hasMessageAvailableAsync( + [self, needCallBack, callback, hasMessageAvailable](Result result, bool hasMsg) { + if (result != ResultOk) { + LOG_ERROR("Filed when acknowledge list: " << result); + // set needCallBack is -1 to avoid repeated callback. + needCallBack->store(-1); + callback(result, false); + return; + } + + if (hasMsg) { + hasMessageAvailable->store(hasMsg); + } + + if (--(*needCallBack) == 0) { + callback(result, hasMessageAvailable->load() || self->incomingMessagesSize_ > 0); + } + }); + }); +} diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h index 50cdecf3..32ec750c 100644 --- a/lib/MultiTopicsConsumerImpl.h +++ b/lib/MultiTopicsConsumerImpl.h @@ -25,6 +25,7 @@ #include #include "BlockingQueue.h" +#include "Commands.h" #include "ConsumerImplBase.h" #include "Future.h" #include "Latch.h" @@ -53,10 +54,15 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { public: MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName, int numPartitions, const std::string& subscriptionName, const ConsumerConfiguration& conf, - LookupServicePtr lookupServicePtr); + LookupServicePtr lookupServicePtr, + const Commands::SubscriptionMode = Commands::SubscriptionModeDurable, + boost::optional startMessageId = boost::none); + MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector& topics, const std::string& subscriptionName, TopicNamePtr topicName, - const ConsumerConfiguration& conf, LookupServicePtr lookupServicePtr_); + const ConsumerConfiguration& conf, LookupServicePtr lookupServicePtr_, + const Commands::SubscriptionMode = Commands::SubscriptionModeDurable, + boost::optional startMessageId = boost::none); ~MultiTopicsConsumerImpl(); // overrided methods from ConsumerImplBase @@ -88,6 +94,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { void negativeAcknowledge(const MessageId& msgId) override; bool isConnected() const override; uint64_t getNumberOfConnectedConsumer() override; + void hasMessageAvailableAsync(HasMessageAvailableCallback callback) override; void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, MultiTopicsBrokerConsumerStatsPtr, size_t, BrokerConsumerStatsCallback); @@ -118,6 +125,8 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_; const std::vector topics_; std::queue pendingReceives_; + const Commands::SubscriptionMode subscriptionMode_; + boost::optional startMessageId_; /* methods */ void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr, @@ -167,6 +176,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery); FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery); + FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition); }; typedef std::shared_ptr MultiTopicsConsumerImplPtr; diff --git a/lib/ReaderImpl.cc b/lib/ReaderImpl.cc index da1d95ec..a06c0ec1 100644 --- a/lib/ReaderImpl.cc +++ b/lib/ReaderImpl.cc @@ -23,6 +23,7 @@ #include "ConsumerImpl.h" #include "ExecutorService.h" #include "GetLastMessageIdResponse.h" +#include "MultiTopicsConsumerImpl.h" #include "TopicName.h" namespace pulsar { @@ -35,9 +36,14 @@ ConsumerConfiguration consumerConfigOfReader; static ResultCallback emptyCallback; -ReaderImpl::ReaderImpl(const ClientImplPtr client, const std::string& topic, const ReaderConfiguration& conf, - const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback) - : topic_(topic), client_(client), readerConf_(conf), readerCreatedCallback_(readerCreatedCallback) {} +ReaderImpl::ReaderImpl(const ClientImplPtr client, const std::string& topic, int partitions, + const ReaderConfiguration& conf, const ExecutorServicePtr listenerExecutor, + ReaderCallback readerCreatedCallback) + : topic_(topic), + partitions_(partitions), + client_(client), + readerConf_(conf), + readerCreatedCallback_(readerCreatedCallback) {} void ReaderImpl::start(const MessageId& startMessageId, std::function callback) { @@ -80,10 +86,19 @@ void ReaderImpl::start(const MessageId& startMessageId, test::consumerConfigOfReader = consumerConf.clone(); } - consumer_ = std::make_shared( - client_.lock(), topic_, subscription, consumerConf, TopicName::get(topic_)->isPersistent(), - ExecutorServicePtr(), false, NonPartitioned, Commands::SubscriptionModeNonDurable, startMessageId); - consumer_->setPartitionIndex(TopicName::getPartitionIndex(topic_)); + if (partitions_ > 0) { + auto consumerImpl = std::make_shared( + client_.lock(), TopicName::get(topic_), partitions_, subscription, consumerConf, + client_.lock()->getLookup(), Commands::SubscriptionModeNonDurable, startMessageId); + consumer_ = consumerImpl; + } else { + auto consumerImpl = std::make_shared( + client_.lock(), topic_, subscription, consumerConf, TopicName::get(topic_)->isPersistent(), + ExecutorServicePtr(), false, NonPartitioned, Commands::SubscriptionModeNonDurable, + startMessageId); + consumerImpl->setPartitionIndex(TopicName::getPartitionIndex(topic_)); + consumer_ = consumerImpl; + } auto self = shared_from_this(); consumer_->getConsumerCreatedFuture().addListener( [this, self, callback](Result result, const ConsumerImplBaseWeakPtr& weakConsumerPtr) { diff --git a/lib/ReaderImpl.h b/lib/ReaderImpl.h index e216241d..3731990d 100644 --- a/lib/ReaderImpl.h +++ b/lib/ReaderImpl.h @@ -58,8 +58,9 @@ extern PULSAR_PUBLIC ConsumerConfiguration consumerConfigOfReader; class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this { public: - ReaderImpl(const ClientImplPtr client, const std::string& topic, const ReaderConfiguration& conf, - const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback); + ReaderImpl(const ClientImplPtr client, const std::string& topic, int partitions, + const ReaderConfiguration& conf, const ExecutorServicePtr listenerExecutor, + ReaderCallback readerCreatedCallback); void start(const MessageId& startMessageId, std::function callback); @@ -73,7 +74,7 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this Future getReaderCreatedFuture(); - ConsumerImplWeakPtr getConsumer() const noexcept { return consumer_; } + ConsumerImplBasePtr getConsumer() const noexcept { return consumer_; } void hasMessageAvailableAsync(HasMessageAvailableCallback callback); @@ -90,9 +91,10 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this void acknowledgeIfNecessary(Result result, const Message& msg); std::string topic_; + int partitions_; ClientImplWeakPtr client_; ReaderConfiguration readerConf_; - ConsumerImplPtr consumer_; + ConsumerImplBasePtr consumer_; ReaderCallback readerCreatedCallback_; ReaderListener readerListener_; }; diff --git a/lib/UnAckedMessageTrackerEnabled.h b/lib/UnAckedMessageTrackerEnabled.h index 0bdcc852..1453460c 100644 --- a/lib/UnAckedMessageTrackerEnabled.h +++ b/lib/UnAckedMessageTrackerEnabled.h @@ -64,6 +64,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface { FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery); FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery); FRIEND_TEST(ConsumerTest, testBatchUnAckedMessageTracker); + FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition); }; } // namespace pulsar diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index f3c8abbe..ac971b3b 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -261,6 +261,53 @@ TEST(ConsumerTest, testConsumerEventWithPartition) { ASSERT_EQ(0, result.size()); } +TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition) { + Client client(lookupUrl); + + const std::string topic = "testAcknowledgeCumulativeWithPartition-" + std::to_string(time(nullptr)); + const std::string subName = "sub"; + + int res = makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + topic + "/partitions", + std::to_string(2)); + ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; + + Consumer consumer; + ConsumerConfiguration consumerConfiguration; + consumerConfiguration.setUnAckedMessagesTimeoutMs(10000); + ASSERT_EQ(ResultOk, client.subscribe(topic, "t-sub", consumerConfiguration, consumer)); + + Producer producer; + ProducerConfiguration producerConfiguration; + producerConfiguration.setBatchingEnabled(false); + producerConfiguration.setPartitionsRoutingMode( + ProducerConfiguration::PartitionsRoutingMode::RoundRobinDistribution); + ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfiguration, producer)); + + const int numMessages = 100; + for (int i = 0; i < numMessages; ++i) { + Message msg = MessageBuilder().setContent(std::to_string(i)).build(); + ASSERT_EQ(ResultOk, producer.send(msg)); + } + + Message msg; + for (int i = 0; i < numMessages; i++) { + ASSERT_EQ(ResultOk, consumer.receive(msg)); + // The last message of each partition topic be ACK + if (i >= numMessages - 2) { + consumer.acknowledgeCumulative(msg.getMessageId()); + } + } + ASSERT_EQ(ResultTimeout, consumer.receive(msg, 2000)); + + // Assert that there is no message in the tracker. + auto multiConsumerImpl = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer); + auto tracker = + static_cast(multiConsumerImpl->unAckedMessageTrackerPtr_.get()); + ASSERT_EQ(0, tracker->size()); + + client.close(); +} + TEST(ConsumerTest, consumerNotInitialized) { Consumer consumer; diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h index aa7737d0..1ef1ee7b 100644 --- a/tests/PulsarFriend.h +++ b/tests/PulsarFriend.h @@ -99,7 +99,7 @@ class PulsarFriend { } static ConsumerImplPtr getConsumer(Reader reader) { - return std::static_pointer_cast(reader.impl_->getConsumer().lock()); + return std::static_pointer_cast(reader.impl_->getConsumer()); } static ReaderImplWeakPtr getReaderImplWeakPtr(Reader reader) { return reader.impl_; } diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc index eefe1bcf..afef384d 100644 --- a/tests/ReaderTest.cc +++ b/tests/ReaderTest.cc @@ -37,10 +37,28 @@ using namespace pulsar; static std::string serviceUrl = "pulsar://localhost:6650"; static const std::string adminUrl = "http://localhost:8080/"; -TEST(ReaderTest, testSimpleReader) { +class ReaderTest : public ::testing::TestWithParam { + public: + void initTopic(std::string topicName) { + if (isMultiTopic_) { + // call admin api to make it partitioned + std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions"; + int res = makePutRequest(url, "5"); + LOG_INFO("res = " << res); + ASSERT_FALSE(res != 204 && res != 409); + } + } + + protected: + bool isMultiTopic_ = GetParam(); +}; + +TEST_P(ReaderTest, testSimpleReader) { Client client(serviceUrl); - std::string topicName = "persistent://public/default/test-simple-reader"; + std::string topicName = + "test-simple-reader" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_); + initTopic(topicName); ReaderConfiguration readerConf; Reader reader; @@ -69,10 +87,11 @@ TEST(ReaderTest, testSimpleReader) { client.close(); } -TEST(ReaderTest, testAsyncRead) { +TEST_P(ReaderTest, testAsyncRead) { Client client(serviceUrl); - std::string topicName = "persistent://public/default/test-simple-reader" + std::to_string(time(nullptr)); + std::string topicName = "testAsyncRead" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_); + initTopic(topicName); ReaderConfiguration readerConf; Reader reader; @@ -113,10 +132,12 @@ TEST(ReaderTest, testAsyncRead) { client.close(); } -TEST(ReaderTest, testReaderAfterMessagesWerePublished) { +TEST_P(ReaderTest, testReaderAfterMessagesWerePublished) { Client client(serviceUrl); - std::string topicName = "persistent://public/default/testReaderAfterMessagesWerePublished"; + std::string topicName = "testReaderAfterMessagesWerePublished" + std::to_string(time(nullptr)) + + std::to_string(isMultiTopic_); + initTopic(topicName); Producer producer; ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); @@ -145,10 +166,12 @@ TEST(ReaderTest, testReaderAfterMessagesWerePublished) { client.close(); } -TEST(ReaderTest, testMultipleReaders) { +TEST_P(ReaderTest, testMultipleReaders) { Client client(serviceUrl); - std::string topicName = "persistent://public/default/testMultipleReaders"; + std::string topicName = + "testMultipleReaders" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_); + initTopic(topicName); Producer producer; ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); @@ -190,10 +213,12 @@ TEST(ReaderTest, testMultipleReaders) { client.close(); } -TEST(ReaderTest, testReaderOnLastMessage) { +TEST_P(ReaderTest, testReaderOnLastMessage) { Client client(serviceUrl); - std::string topicName = "persistent://public/default/testReaderOnLastMessage"; + std::string topicName = + "testReaderOnLastMessage" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_); + initTopic(topicName); Producer producer; ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); @@ -228,10 +253,12 @@ TEST(ReaderTest, testReaderOnLastMessage) { client.close(); } -TEST(ReaderTest, testReaderOnSpecificMessage) { +TEST_P(ReaderTest, testReaderOnSpecificMessage) { Client client(serviceUrl); - std::string topicName = "persistent://public/default/testReaderOnSpecificMessage"; + std::string topicName = + "testReaderOnSpecificMessage" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_); + initTopic(topicName); Producer producer; ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); @@ -277,12 +304,15 @@ TEST(ReaderTest, testReaderOnSpecificMessage) { } /** - * Test that we can position on a particular message even within a batch + * build, file MessageIdBuilder.cc, line 45.?? Test that we can position on a particular message even within a + * batch */ -TEST(ReaderTest, testReaderOnSpecificMessageWithBatches) { +TEST_P(ReaderTest, testReaderOnSpecificMessageWithBatches) { Client client(serviceUrl); - std::string topicName = "persistent://public/default/testReaderOnSpecificMessageWithBatches"; + std::string topicName = "testReaderOnSpecificMessageWithBatches" + std::to_string(time(nullptr)) + + std::to_string(isMultiTopic_); + initTopic(topicName); Producer producer; // Enable batching @@ -339,10 +369,12 @@ TEST(ReaderTest, testReaderOnSpecificMessageWithBatches) { client.close(); } -TEST(ReaderTest, testReaderReachEndOfTopic) { +TEST_P(ReaderTest, testReaderReachEndOfTopic) { Client client(serviceUrl); - std::string topicName = "persistent://public/default/testReaderReachEndOfTopic"; + std::string topicName = + "testReaderReachEndOfTopic" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_); + initTopic(topicName); // 1. create producer Producer producer; @@ -414,10 +446,12 @@ TEST(ReaderTest, testReaderReachEndOfTopic) { client.close(); } -TEST(ReaderTest, testReaderReachEndOfTopicMessageWithoutBatches) { +TEST_P(ReaderTest, testReaderReachEndOfTopicMessageWithoutBatches) { Client client(serviceUrl); - std::string topicName = "persistent://public/default/testReaderReachEndOfTopicMessageWithBatches"; + std::string topicName = "testReaderReachEndOfTopicMessageWithoutBatches" + std::to_string(time(nullptr)) + + std::to_string(isMultiTopic_); + initTopic(topicName); // 1. create producer Producer producer; @@ -510,10 +544,12 @@ TEST(ReaderTest, testPartitionIndex) { client.close(); } -TEST(ReaderTest, testSubscriptionNameSetting) { +TEST_P(ReaderTest, testSubscriptionNameSetting) { Client client(serviceUrl); - std::string topicName = "persistent://public/default/test-subscription-name-setting"; + std::string topicName = + "testSubscriptionNameSetting" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_); + initTopic(topicName); std::string subName = "test-sub"; ReaderConfiguration readerConf; @@ -527,10 +563,12 @@ TEST(ReaderTest, testSubscriptionNameSetting) { client.close(); } -TEST(ReaderTest, testSetSubscriptionNameAndPrefix) { +TEST_P(ReaderTest, testSetSubscriptionNameAndPrefix) { Client client(serviceUrl); - std::string topicName = "persistent://public/default/testSetSubscriptionNameAndPrefix"; + std::string topicName = + "testSetSubscriptionNameAndPrefix" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_); + initTopic(topicName); std::string subName = "test-sub"; ReaderConfiguration readerConf; @@ -545,10 +583,12 @@ TEST(ReaderTest, testSetSubscriptionNameAndPrefix) { client.close(); } -TEST(ReaderTest, testMultiSameSubscriptionNameReaderShouldFail) { +TEST_P(ReaderTest, testMultiSameSubscriptionNameReaderShouldFail) { Client client(serviceUrl); - std::string topicName = "persistent://public/default/testMultiSameSubscriptionNameReaderShouldFail"; + std::string topicName = "testMultiSameSubscriptionNameReaderShouldFail" + std::to_string(time(nullptr)) + + std::to_string(isMultiTopic_); + initTopic(topicName); std::string subscriptionName = "test-sub"; ReaderConfiguration readerConf1; @@ -567,28 +607,33 @@ TEST(ReaderTest, testMultiSameSubscriptionNameReaderShouldFail) { client.close(); } -TEST(ReaderTest, testIsConnected) { - const std::string topic = "testReaderIsConnected-" + std::to_string(time(nullptr)); +TEST_P(ReaderTest, testIsConnected) { Client client(serviceUrl); + std::string topicName = "testIsConnected" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_); + initTopic(topicName); + Reader reader; ASSERT_FALSE(reader.isConnected()); - ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {}, reader)); + ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), {}, reader)); ASSERT_TRUE(reader.isConnected()); ASSERT_EQ(ResultOk, reader.close()); ASSERT_FALSE(reader.isConnected()); } -TEST(ReaderTest, testHasMessageAvailableWhenCreated) { - const std::string topic = "testHasMessageAvailableWhenCreated-" + std::to_string(time(nullptr)); +TEST_P(ReaderTest, testHasMessageAvailableWhenCreated) { Client client(serviceUrl); + std::string topicName = + "testHasMessageAvailableWhenCreated" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_); + initTopic(topicName); + ProducerConfiguration producerConf; producerConf.setBatchingMaxMessages(3); Producer producer; - ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf, producer)); + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer)); std::vector messageIds; constexpr int numMessages = 7; @@ -612,25 +657,28 @@ TEST(ReaderTest, testHasMessageAvailableWhenCreated) { bool hasMessageAvailable; for (size_t i = 0; i < messageIds.size() - 1; i++) { - ASSERT_EQ(ResultOk, client.createReader(topic, messageIds[i], {}, reader)); + ASSERT_EQ(ResultOk, client.createReader(topicName, messageIds[i], {}, reader)); ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable)); EXPECT_TRUE(hasMessageAvailable); } // The start message ID is exclusive by default, so when we start at the last message, there should be no // message available. - ASSERT_EQ(ResultOk, client.createReader(topic, messageIds.back(), {}, reader)); + ASSERT_EQ(ResultOk, client.createReader(topicName, messageIds.back(), {}, reader)); ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable)); EXPECT_FALSE(hasMessageAvailable); client.close(); } -TEST(ReaderTest, testReceiveAfterSeek) { +TEST_P(ReaderTest, testReceiveAfterSeek) { Client client(serviceUrl); - const std::string topic = "reader-test-receive-after-seek-" + std::to_string(time(nullptr)); + + std::string topicName = + "testReceiveAfterSeek" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_); + initTopic(topicName); Producer producer; - ASSERT_EQ(ResultOk, client.createProducer(topic, producer)); + ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); MessageId seekMessageId; for (int i = 0; i < 5; i++) { @@ -642,7 +690,7 @@ TEST(ReaderTest, testReceiveAfterSeek) { } Reader reader; - ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::latest(), {}, reader)); + ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::latest(), {}, reader)); reader.seek(seekMessageId); @@ -651,3 +699,5 @@ TEST(ReaderTest, testReceiveAfterSeek) { client.close(); } + +INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false)); \ No newline at end of file