From 25ee4209e8500f9ce19fdbbdf1e819ff669129ee Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 17 May 2024 16:40:27 +0800 Subject: [PATCH 1/6] Support seek operation on a multi-topics consumer ### Motivation See https://github.com/apache/pulsar-client-python/issues/213 ### Modifications Add a new `forEachValue` overload that allows users to count the number of rest running tasks through `SharedFuture` to `SynchronizedHashMap`. Leverage this overload in seek operations when the argument is a timestamp, or a MessageId that represents earliest or latest. When the argument is a MessageId whose `getTopicName()` method returns a correct topic name, seek on the internal consumer of that topic. Add `testMultiTopicsSeekAll` and `testMultiTopicsSeekSingle` to `ConsumerSeekTest` to cover these cases. --- lib/MultiTopicsConsumerImpl.cc | 135 ++++++++++------------ lib/MultiTopicsConsumerImpl.h | 54 ++++++++- lib/SynchronizedHashMap.h | 67 ++++++++++- tests/ConsumerSeekTest.cc | 199 +++++++++++++++++++++++++++++++++ tests/ConsumerTest.cc | 65 ----------- 5 files changed, 371 insertions(+), 149 deletions(-) create mode 100644 tests/ConsumerSeekTest.cc diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index 80566c86..6acc683d 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -338,41 +338,23 @@ void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) } state_ = Closing; - std::shared_ptr> consumerUnsubed = std::make_shared>(0); auto self = get_shared_this_ptr(); - int numConsumers = 0; consumers_.forEachValue( - [&numConsumers, &consumerUnsubed, &self, callback](const ConsumerImplPtr& consumer) { - numConsumers++; - consumer->unsubscribeAsync([self, consumerUnsubed, callback](Result result) { - self->handleUnsubscribedAsync(result, consumerUnsubed, callback); + [this, self, callback](const ConsumerImplPtr& consumer, SharedFuture future) { + consumer->unsubscribeAsync([this, self, callback, future](Result result) { + if (result != ResultOk) { + state_ = Failed; + LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: " + << result << " subscription - " << subscriptionName_); + } + if (future.tryComplete()) { + LOG_DEBUG("Unsubscribed all of the partition consumer for TopicsConsumer. - " + << consumerStr_); + callback((state_ != Failed) ? ResultOk : ResultUnknownError); + } }); - }); - if (numConsumers == 0) { - // No need to unsubscribe, since the list matching the regex was empty - callback(ResultOk); - } -} - -void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result, - std::shared_ptr> consumerUnsubed, - ResultCallback callback) { - (*consumerUnsubed)++; - - if (result != ResultOk) { - state_ = Failed; - LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: " - << result << " subscription - " << subscriptionName_); - } - - if (consumerUnsubed->load() == numberTopicPartitions_->load()) { - LOG_DEBUG("Unsubscribed all of the partition consumer for TopicsConsumer. - " << consumerStr_); - Result result1 = (state_ != Failed) ? ResultOk : ResultUnknownError; - // The `callback` is a wrapper of user provided callback, it's not null and will call `shutdown()` if - // unsubscribe succeeds. - callback(result1); - return; - } + }, + [callback] { callback(ResultOk); }); } void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic, ResultCallback callback) { @@ -841,24 +823,26 @@ void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCal callback(ResultConsumerNotInitialized, BrokerConsumerStats()); return; } + Lock lock(mutex_); MultiTopicsBrokerConsumerStatsPtr statsPtr = std::make_shared(numberTopicPartitions_->load()); - LatchPtr latchPtr = std::make_shared(numberTopicPartitions_->load()); lock.unlock(); - size_t i = 0; - consumers_.forEachValue([this, &latchPtr, &statsPtr, &i, callback](const ConsumerImplPtr& consumer) { - size_t index = i++; - auto weakSelf = weak_from_this(); - consumer->getBrokerConsumerStatsAsync( - [this, weakSelf, latchPtr, statsPtr, index, callback](Result result, BrokerConsumerStats stats) { + // TODO: fix the thread safety issue if numberTopicPartitions_ was changed here + consumers_.forEachValue( + [this, statsPtr, &i, callback](const ConsumerImplPtr& consumer, SharedFuture future) { + size_t index = i++; + auto weakSelf = weak_from_this(); + consumer->getBrokerConsumerStatsAsync([this, weakSelf, future, statsPtr, index, callback]( + Result result, BrokerConsumerStats stats) { auto self = weakSelf.lock(); if (self) { - handleGetConsumerStats(result, stats, latchPtr, statsPtr, index, callback); + handleGetConsumerStats(result, stats, future, statsPtr, index, callback); } }); - }); + }, + [callback] { callback(ResultOk, BrokerConsumerStats{}); }); } void MultiTopicsConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) { @@ -866,19 +850,20 @@ void MultiTopicsConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallba } void MultiTopicsConsumerImpl::handleGetConsumerStats(Result res, BrokerConsumerStats brokerConsumerStats, - LatchPtr latchPtr, + SharedFuture future, MultiTopicsBrokerConsumerStatsPtr statsPtr, size_t index, BrokerConsumerStatsCallback callback) { Lock lock(mutex_); + bool completed = false; if (res == ResultOk) { - latchPtr->countdown(); + completed = future.tryComplete(); statsPtr->add(brokerConsumerStats, index); } else { lock.unlock(); callback(res, BrokerConsumerStats()); return; } - if (latchPtr->getCount() == 0) { + if (completed) { lock.unlock(); callback(ResultOk, BrokerConsumerStats(statsPtr)); } @@ -899,50 +884,50 @@ std::shared_ptr MultiTopicsConsumerImpl::topicNamesValid(const std::v return topicNamePtr; } -void MultiTopicsConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) { - callback(ResultOperationNotSupported); -} - -void MultiTopicsConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) { - if (state_ != Ready) { - callback(ResultAlreadyClosed); - return; - } - +void MultiTopicsConsumerImpl::beforeSeek() { duringSeek_.store(true, std::memory_order_release); consumers_.forEachValue([](const ConsumerImplPtr& consumer) { consumer->pauseMessageListener(); }); unAckedMessageTrackerPtr_->clear(); incomingMessages_.clear(); incomingMessagesSize_ = 0L; +} + +void MultiTopicsConsumerImpl::afterSeek() { + duringSeek_.store(false, std::memory_order_release); + auto self = get_shared_this_ptr(); + listenerExecutor_->postWork([this, self] { + consumers_.forEachValue([](const ConsumerImplPtr& consumer) { consumer->resumeMessageListener(); }); + }); +} + +void MultiTopicsConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) { + if (msgId == MessageId::earliest() || msgId == MessageId::latest()) { + return seekAllAsync(msgId, callback); + } + + auto optConsumer = consumers_.find(msgId.getTopicName()); + if (!optConsumer) { + callback(ResultOperationNotSupported); + return; + } + beforeSeek(); auto weakSelf = weak_from_this(); - auto numConsumersLeft = std::make_shared>(consumers_.size()); - auto wrappedCallback = [this, weakSelf, callback, numConsumersLeft](Result result) { + optConsumer.get()->seekAsync(msgId, [this, weakSelf, callback](Result result) { auto self = weakSelf.lock(); - if (PULSAR_UNLIKELY(!self)) { - callback(result); - return; - } - if (result != ResultOk) { - *numConsumersLeft = 0; // skip the following callbacks + if (self) { + afterSeek(); callback(result); - return; - } - if (--*numConsumersLeft > 0) { - return; + } else { + callback(ResultAlreadyClosed); } - duringSeek_.store(false, std::memory_order_release); - listenerExecutor_->postWork([this, self] { - consumers_.forEachValue( - [](const ConsumerImplPtr& consumer) { consumer->resumeMessageListener(); }); - }); - callback(ResultOk); - }; - consumers_.forEachValue([timestamp, &wrappedCallback](const ConsumerImplPtr& consumer) { - consumer->seekAsync(timestamp, wrappedCallback); }); } +void MultiTopicsConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) { + seekAllAsync(timestamp, callback); +} + void MultiTopicsConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabled) { consumers_.forEachValue([enabled](const ConsumerImplPtr& consumer) { consumer->setNegativeAcknowledgeEnabledForTesting(enabled); diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h index b5c51ec9..ce98dfcc 100644 --- a/lib/MultiTopicsConsumerImpl.h +++ b/lib/MultiTopicsConsumerImpl.h @@ -25,7 +25,7 @@ #include #include "Commands.h" -#include "ConsumerImplBase.h" +#include "ConsumerImpl.h" #include "ConsumerInterceptors.h" #include "Future.h" #include "Latch.h" @@ -38,7 +38,6 @@ namespace pulsar { typedef std::shared_ptr> ConsumerSubResultPromisePtr; -class ConsumerImpl; using ConsumerImplPtr = std::shared_ptr; class ClientImpl; using ClientImplPtr = std::shared_ptr; @@ -99,7 +98,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { uint64_t getNumberOfConnectedConsumer() override; void hasMessageAvailableAsync(HasMessageAvailableCallback callback) override; - void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, MultiTopicsBrokerConsumerStatsPtr, + void handleGetConsumerStats(Result, BrokerConsumerStats, SharedFuture, MultiTopicsBrokerConsumerStatsPtr, size_t, BrokerConsumerStatsCallback); // return first topic name when all topics name valid, or return null pointer static std::shared_ptr topicNamesValid(const std::vector& topics); @@ -152,8 +151,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { void handleSingleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr, std::shared_ptr> partitionsNeedCreate, ConsumerSubResultPromisePtr topicSubResultPromise); - void handleUnsubscribedAsync(Result result, std::shared_ptr> consumerUnsubed, - ResultCallback callback); void handleOneTopicUnsubscribedAsync(Result result, std::shared_ptr> consumerUnsubed, int numberPartitions, TopicNamePtr topicNamePtr, std::string& topicPartitionName, ResultCallback callback); @@ -179,6 +176,16 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { return std::static_pointer_cast(shared_from_this()); } + template +#if __cplusplus >= 202002L + requires std::convertible_to || + std::same_as>, MessageId> +#endif + void seekAllAsync(const SeekArg& seekArg, ResultCallback callback); + + void beforeSeek(); + void afterSeek(); + FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery); FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery); FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition); @@ -187,5 +194,42 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { }; typedef std::shared_ptr MultiTopicsConsumerImplPtr; + +template +#if __cplusplus >= 202002L + requires std::convertible_to || + std::same_as>, MessageId> +#endif + inline void MultiTopicsConsumerImpl::seekAllAsync(const SeekArg& seekArg, ResultCallback callback) { + if (state_ != Ready) { + callback(ResultAlreadyClosed); + return; + } + beforeSeek(); + auto weakSelf = weak_from_this(); + auto failed = std::make_shared(false); + consumers_.forEachValue( + [this, weakSelf, &seekArg, callback, failed](const ConsumerImplPtr& consumer, SharedFuture future) { + consumer->seekAsync(seekArg, [this, weakSelf, callback, failed, future](Result result) { + auto self = weakSelf.lock(); + if (!self || failed->load(std::memory_order_acquire)) { + callback(result); + return; + } + if (result != ResultOk) { + failed->store(true, std::memory_order_release); // skip the following callbacks + afterSeek(); + callback(result); + return; + } + if (future.tryComplete()) { + afterSeek(); + callback(ResultOk); + } + }); + }, + [callback] { callback(ResultOk); }); +} + } // namespace pulsar #endif // PULSAR_MULTI_TOPICS_CONSUMER_HEADER diff --git a/lib/SynchronizedHashMap.h b/lib/SynchronizedHashMap.h index 082aeaf4..e224913b 100644 --- a/lib/SynchronizedHashMap.h +++ b/lib/SynchronizedHashMap.h @@ -18,8 +18,10 @@ */ #pragma once +#include #include #include +#include #include #include #include @@ -27,6 +29,16 @@ namespace pulsar { +class SharedFuture { + public: + SharedFuture(size_t size) : count_(std::make_shared(size)) {} + + bool tryComplete() const { return --*count_ == 0; } + + private: + std::shared_ptr count_; +}; + // V must be default constructible and copyable template class SynchronizedHashMap { @@ -60,10 +72,57 @@ class SynchronizedHashMap { } } - void forEachValue(std::function f) const { - Lock lock(mutex_); - for (const auto& kv : data_) { - f(kv.second); + template +#if __cplusplus >= 202002L + requires requires(ValueFunc&& each, const V& value) { + each(value); + } +#endif + void forEachValue(ValueFunc&& each) { + Lock lock{mutex_}; + for (auto&& kv : data_) { + each(kv.second); + } + } + + // This override provides a convenient approach to execute tasks on each consumer concurrently and + // supports checking if all tasks are done in the `each` callback. + // + // All map values will be passed as the 1st argument to the `each` function. The 2nd argument is a shared + // future whose `tryComplete` method marks this task as completed. If users want to check if all task are + // completed in the `each` function, this method must be called. + // + // For example, given a `SynchronizedHashMap` object `m` and the following call: + // + // ```c++ + // m.forEachValue([](const std::string& s, SharedFuture future) { + // std::cout << s << std::endl; + // if (future.tryComplete()) { + // std::cout << "done" << std::endl; + // } + // }, [] { std::cout << "empty map" << std::endl; }); + // ``` + // + // If the map is empty, only "empty map" will be printed. Otherwise, all values will be printed + // and "done" will be printed after that. + template +#if __cplusplus >= 202002L + requires requires(ValueFunc&& each, const V& value, SharedFuture count, EmptyFunc emptyFunc) { + each(value, count); + emptyFunc(); + } +#endif + void forEachValue(ValueFunc&& each, EmptyFunc&& emptyFunc) { + std::unique_lock lock{mutex_}; + if (data_.empty()) { + lock.unlock(); + emptyFunc(); + return; + } + SharedFuture future{data_.size()}; + for (auto&& kv : data_) { + const auto& value = kv.second; + each(value, future); } } diff --git a/tests/ConsumerSeekTest.cc b/tests/ConsumerSeekTest.cc new file mode 100644 index 00000000..9b6378c4 --- /dev/null +++ b/tests/ConsumerSeekTest.cc @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include + +#include +#include +#include + +#include "HttpHelper.h" +#include "lib/LogUtils.h" + +DECLARE_LOG_OBJECT() + +static const std::string lookupUrl = "pulsar://localhost:6650"; +static const std::string adminUrl = "http://localhost:8080/"; + +extern std::string unique_str(); + +namespace pulsar { + +class ConsumerSeekTest : public ::testing::TestWithParam { + public: + void SetUp() override { client_ = Client{lookupUrl}; } + + void TearDown() override { client_.close(); } + + protected: + Client client_{lookupUrl}; + ProducerConfiguration producerConf_; + + std::vector initProducersForPartitionedTopic(const std::string& topic) { + constexpr int numPartitions = 3; + int res = makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + topic + "/partitions", + std::to_string(numPartitions)); + if (res != 204 && res != 409) { + throw std::runtime_error("Failed to create partitioned topic: " + std::to_string(res)); + } + + std::vector producers(numPartitions); + for (int i = 0; i < numPartitions; i++) { + auto result = client_.createProducer(topic + "-partition-" + std::to_string(i), producers[i]); + if (result != ResultOk) { + throw std::runtime_error(std::string{"Failed to create producer: "} + strResult(result)); + } + } + return producers; + } + + Consumer createConsumer(const std::string& topic) { + Consumer consumer; + ConsumerConfiguration conf; + conf.setStartMessageIdInclusive(GetParam()); + auto result = client_.subscribe(topic, "sub", conf, consumer); + if (result != ResultOk) { + throw std::runtime_error(std::string{"Failed to subscribe: "} + strResult(result)); + } + return consumer; + } +}; + +TEST_P(ConsumerSeekTest, testSeekForMessageId) { + Client client(lookupUrl); + + const std::string topic = "test-seek-for-message-id-" + std::string((GetParam() ? "batch-" : "")) + + std::to_string(time(nullptr)); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf_, producer)); + + Consumer consumerExclusive; + ASSERT_EQ(ResultOk, client.subscribe(topic, "sub-0", consumerExclusive)); + + Consumer consumerInclusive; + ASSERT_EQ(ResultOk, + client.subscribe(topic, "sub-1", ConsumerConfiguration().setStartMessageIdInclusive(true), + consumerInclusive)); + + const auto numMessages = 100; + MessageId seekMessageId; + + int r = (rand() % (numMessages - 1)); + for (int i = 0; i < numMessages; i++) { + MessageId id; + ASSERT_EQ(ResultOk, + producer.send(MessageBuilder().setContent("msg-" + std::to_string(i)).build(), id)); + + if (i == r) { + seekMessageId = id; + } + } + + LOG_INFO("The seekMessageId is: " << seekMessageId << ", r : " << r); + + consumerExclusive.seek(seekMessageId); + Message msg0; + ASSERT_EQ(ResultOk, consumerExclusive.receive(msg0, 3000)); + + consumerInclusive.seek(seekMessageId); + Message msg1; + ASSERT_EQ(ResultOk, consumerInclusive.receive(msg1, 3000)); + + LOG_INFO("consumerExclusive received " << msg0.getDataAsString() << " from " << msg0.getMessageId()); + LOG_INFO("consumerInclusive received " << msg1.getDataAsString() << " from " << msg1.getMessageId()); + + ASSERT_EQ(msg0.getDataAsString(), "msg-" + std::to_string(r + 1)); + ASSERT_EQ(msg1.getDataAsString(), "msg-" + std::to_string(r)); + + consumerInclusive.close(); + consumerExclusive.close(); + producer.close(); +} + +TEST_P(ConsumerSeekTest, testMultiTopicsSeekAll) { + std::string topic = "consumer-seek-test-multi-topics-seek-all-" + unique_str(); + auto producers = initProducersForPartitionedTopic(topic); + auto consumer = createConsumer(topic); + const auto numPartitions = producers.size(); + + auto receive = [&consumer, numPartitions] { + std::set values; + for (int i = 0; i < numPartitions; i++) { + Message msg; + auto result = consumer.receive(msg, 3000); + if (result != ResultOk) { + throw std::runtime_error(std::string{"Receive failed: "} + strResult(result)); + } + values.emplace(msg.getDataAsString()); + } + return values; + }; + + for (int i = 0; i < numPartitions; i++) { + producers[i].send(MessageBuilder().setContent("msg-" + std::to_string(i) + "-0").build()); + } + ASSERT_EQ(receive(), (std::set{"msg-0-0", "msg-1-0", "msg-2-0"})); + + // Seek to earliest + ASSERT_EQ(ResultOk, consumer.seek(MessageId::earliest())); + ASSERT_EQ(receive(), (std::set{"msg-0-0", "msg-1-0", "msg-2-0"})); + + // Seek to latest + for (int i = 0; i < numPartitions; i++) { + producers[i].send(MessageBuilder().setContent("msg-" + std::to_string(i) + "-1").build()); + } + ASSERT_EQ(ResultOk, consumer.seek(MessageId::latest())); + + for (int i = 0; i < numPartitions; i++) { + producers[i].send(MessageBuilder().setContent("msg-" + std::to_string(i) + "-2").build()); + } + ASSERT_EQ(receive(), (std::set{"msg-0-2", "msg-1-2", "msg-2-2"})); +} + +TEST_P(ConsumerSeekTest, testMultiTopicsSeekSingle) { + std::string topic = "consumer-seek-test-multi-topics-seek-single-" + unique_str(); + auto producers = initProducersForPartitionedTopic(topic); + auto consumer = createConsumer(topic); + + MessageId msgId; + producers[0].send(MessageBuilder().setContent("msg-0").build(), msgId); + ASSERT_EQ(ResultOperationNotSupported, consumer.seek(msgId)); + producers[0].send(MessageBuilder().setContent("msg-1").build(), msgId); + ASSERT_EQ(ResultOperationNotSupported, consumer.seek(msgId)); + + std::vector msgIds; + Message msg; + for (int i = 0; i < 2; i++) { + ASSERT_EQ(ResultOk, consumer.receive(msg, 3000)); + msgIds.emplace_back(msg.getMessageId()); + } + + ASSERT_EQ(ResultOk, consumer.seek(msgIds[0])); + ASSERT_EQ(ResultOk, consumer.receive(msg, 3000)); + if (GetParam()) { + ASSERT_EQ(msg.getMessageId(), msgIds[0]); + } else { + ASSERT_EQ(msg.getMessageId(), msgIds[1]); + } +} + +INSTANTIATE_TEST_SUITE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true, false)); + +} // namespace pulsar diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index 2aab7229..f9840f97 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -1136,69 +1136,6 @@ TEST(ConsumerTest, testPatternSubscribeTopic) { client.close(); } -class ConsumerSeekTest : public ::testing::TestWithParam { - public: - void SetUp() override { producerConf_ = ProducerConfiguration().setBatchingEnabled(GetParam()); } - - void TearDown() override { client_.close(); } - - protected: - Client client_{lookupUrl}; - ProducerConfiguration producerConf_; -}; - -TEST_P(ConsumerSeekTest, testSeekForMessageId) { - Client client(lookupUrl); - - const std::string topic = "test-seek-for-message-id-" + std::string((GetParam() ? "batch-" : "")) + - std::to_string(time(nullptr)); - - Producer producer; - ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf_, producer)); - - Consumer consumerExclusive; - ASSERT_EQ(ResultOk, client.subscribe(topic, "sub-0", consumerExclusive)); - - Consumer consumerInclusive; - ASSERT_EQ(ResultOk, - client.subscribe(topic, "sub-1", ConsumerConfiguration().setStartMessageIdInclusive(true), - consumerInclusive)); - - const auto numMessages = 100; - MessageId seekMessageId; - - int r = (rand() % (numMessages - 1)); - for (int i = 0; i < numMessages; i++) { - MessageId id; - ASSERT_EQ(ResultOk, - producer.send(MessageBuilder().setContent("msg-" + std::to_string(i)).build(), id)); - - if (i == r) { - seekMessageId = id; - } - } - - LOG_INFO("The seekMessageId is: " << seekMessageId << ", r : " << r); - - consumerExclusive.seek(seekMessageId); - Message msg0; - ASSERT_EQ(ResultOk, consumerExclusive.receive(msg0, 3000)); - - consumerInclusive.seek(seekMessageId); - Message msg1; - ASSERT_EQ(ResultOk, consumerInclusive.receive(msg1, 3000)); - - LOG_INFO("consumerExclusive received " << msg0.getDataAsString() << " from " << msg0.getMessageId()); - LOG_INFO("consumerInclusive received " << msg1.getDataAsString() << " from " << msg1.getMessageId()); - - ASSERT_EQ(msg0.getDataAsString(), "msg-" + std::to_string(r + 1)); - ASSERT_EQ(msg1.getDataAsString(), "msg-" + std::to_string(r)); - - consumerInclusive.close(); - consumerExclusive.close(); - producer.close(); -} - TEST(ConsumerTest, testNegativeAcksTrackerClose) { Client client(lookupUrl); auto topicName = "testNegativeAcksTrackerClose"; @@ -1252,8 +1189,6 @@ TEST(ConsumerTest, testAckNotPersistentTopic) { client.close(); } -INSTANTIATE_TEST_CASE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true, false)); - class InterceptorForNegAckDeadlock : public ConsumerInterceptor { public: Message beforeConsume(const Consumer& consumer, const Message& message) override { return message; } From c8272f46b05df368c7572bed9ab68cb069043467 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 27 May 2024 20:09:30 +0800 Subject: [PATCH 2/6] Address comments --- lib/MultiTopicsConsumerImpl.cc | 28 +++++++++++++--------------- lib/MultiTopicsConsumerImpl.h | 2 +- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index 6acc683d..83287a6c 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -823,26 +823,23 @@ void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCal callback(ResultConsumerNotInitialized, BrokerConsumerStats()); return; } - Lock lock(mutex_); MultiTopicsBrokerConsumerStatsPtr statsPtr = std::make_shared(numberTopicPartitions_->load()); + LatchPtr latchPtr = std::make_shared(numberTopicPartitions_->load()); lock.unlock(); size_t i = 0; - // TODO: fix the thread safety issue if numberTopicPartitions_ was changed here - consumers_.forEachValue( - [this, statsPtr, &i, callback](const ConsumerImplPtr& consumer, SharedFuture future) { - size_t index = i++; - auto weakSelf = weak_from_this(); - consumer->getBrokerConsumerStatsAsync([this, weakSelf, future, statsPtr, index, callback]( - Result result, BrokerConsumerStats stats) { + consumers_.forEachValue([this, &latchPtr, &statsPtr, &i, callback](const ConsumerImplPtr& consumer) { + size_t index = i++; + auto weakSelf = weak_from_this(); + consumer->getBrokerConsumerStatsAsync( + [this, weakSelf, latchPtr, statsPtr, index, callback](Result result, BrokerConsumerStats stats) { auto self = weakSelf.lock(); if (self) { - handleGetConsumerStats(result, stats, future, statsPtr, index, callback); + handleGetConsumerStats(result, stats, latchPtr, statsPtr, index, callback); } }); - }, - [callback] { callback(ResultOk, BrokerConsumerStats{}); }); + }); } void MultiTopicsConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) { @@ -850,20 +847,19 @@ void MultiTopicsConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallba } void MultiTopicsConsumerImpl::handleGetConsumerStats(Result res, BrokerConsumerStats brokerConsumerStats, - SharedFuture future, + LatchPtr latchPtr, MultiTopicsBrokerConsumerStatsPtr statsPtr, size_t index, BrokerConsumerStatsCallback callback) { Lock lock(mutex_); - bool completed = false; if (res == ResultOk) { - completed = future.tryComplete(); + latchPtr->countdown(); statsPtr->add(brokerConsumerStats, index); } else { lock.unlock(); callback(res, BrokerConsumerStats()); return; } - if (completed) { + if (latchPtr->getCount() == 0) { lock.unlock(); callback(ResultOk, BrokerConsumerStats(statsPtr)); } @@ -907,6 +903,8 @@ void MultiTopicsConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback c auto optConsumer = consumers_.find(msgId.getTopicName()); if (!optConsumer) { + LOG_ERROR(getName() << "cannot seek a message id whose topic \"" + msgId.getTopicName() + + "\" is not subscribed"); callback(ResultOperationNotSupported); return; } diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h index ce98dfcc..6763942f 100644 --- a/lib/MultiTopicsConsumerImpl.h +++ b/lib/MultiTopicsConsumerImpl.h @@ -98,7 +98,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { uint64_t getNumberOfConnectedConsumer() override; void hasMessageAvailableAsync(HasMessageAvailableCallback callback) override; - void handleGetConsumerStats(Result, BrokerConsumerStats, SharedFuture, MultiTopicsBrokerConsumerStatsPtr, + void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, MultiTopicsBrokerConsumerStatsPtr, size_t, BrokerConsumerStatsCallback); // return first topic name when all topics name valid, or return null pointer static std::shared_ptr topicNamesValid(const std::vector& topics); From e1a9016f1ef5dfce800a06452da05e04c9ef9ffd Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 27 May 2024 20:10:25 +0800 Subject: [PATCH 3/6] Address comments --- lib/MultiTopicsConsumerImpl.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index 83287a6c..e95a9ac4 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -828,6 +828,7 @@ void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCal std::make_shared(numberTopicPartitions_->load()); LatchPtr latchPtr = std::make_shared(numberTopicPartitions_->load()); lock.unlock(); + size_t i = 0; consumers_.forEachValue([this, &latchPtr, &statsPtr, &i, callback](const ConsumerImplPtr& consumer) { size_t index = i++; From 86dbf3129f63eb70b71bc8b0a983a33f0213632b Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 27 May 2024 20:18:07 +0800 Subject: [PATCH 4/6] Add tests for new forEachValue API --- tests/SynchronizedHashMapTest.cc | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/tests/SynchronizedHashMapTest.cc b/tests/SynchronizedHashMapTest.cc index 85378e03..cf184d9f 100644 --- a/tests/SynchronizedHashMapTest.cc +++ b/tests/SynchronizedHashMapTest.cc @@ -91,6 +91,28 @@ TEST(SynchronizedHashMapTest, testForEach) { m.forEach([&pairs](const int& key, const int& value) { pairs.emplace_back(key, value); }); PairVector expectedPairs({{1, 100}, {2, 200}, {3, 300}}); ASSERT_EQ(sort(pairs), expectedPairs); + + m.clear(); + int result = 0; + values.clear(); + m.forEachValue([&values](int value, SharedFuture) { values.emplace_back(value); }, + [&result] { result = 1; }); + ASSERT_TRUE(values.empty()); + ASSERT_EQ(result, 1); + + m.emplace(1, 100); + m.forEachValue([&values](int value, SharedFuture) { values.emplace_back(value); }, + [&result] { result = 2; }); + ASSERT_EQ(values, (std::vector({100}))); + ASSERT_EQ(result, 1); + + values.clear(); + m.emplace(2, 200); + m.forEachValue([&values](int value, SharedFuture) { values.emplace_back(value); }, + [&result] { result = 2; }); + std::sort(values.begin(), values.end()); + ASSERT_EQ(values, (std::vector({100, 200}))); + ASSERT_EQ(result, 1); } TEST(SynchronizedHashMap, testRecursiveMutex) { From 5f3c6d6c134669b2cb267800397888544b696818 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 29 May 2024 14:43:23 +0800 Subject: [PATCH 5/6] Handle no internal consumers when seek --- lib/MultiTopicsConsumerImpl.cc | 5 +++++ lib/MultiTopicsConsumerImpl.h | 3 ++- tests/ConsumerSeekTest.cc | 6 ++++++ 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index e95a9ac4..168af53a 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -897,6 +897,11 @@ void MultiTopicsConsumerImpl::afterSeek() { }); } +void MultiTopicsConsumerImpl::handleNoConsumersWhenSeek(const ResultCallback& callback) const { + LOG_ERROR(getName() << "There are no consumers when seek"); + callback(ResultOperationNotSupported); +} + void MultiTopicsConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) { if (msgId == MessageId::earliest() || msgId == MessageId::latest()) { return seekAllAsync(msgId, callback); diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h index 6763942f..3a253f42 100644 --- a/lib/MultiTopicsConsumerImpl.h +++ b/lib/MultiTopicsConsumerImpl.h @@ -185,6 +185,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { void beforeSeek(); void afterSeek(); + void handleNoConsumersWhenSeek(const ResultCallback& callback) const; FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery); FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery); @@ -228,7 +229,7 @@ template } }); }, - [callback] { callback(ResultOk); }); + [this, callback] { handleNoConsumersWhenSeek(callback); }); } } // namespace pulsar diff --git a/tests/ConsumerSeekTest.cc b/tests/ConsumerSeekTest.cc index 9b6378c4..c89f4ab0 100644 --- a/tests/ConsumerSeekTest.cc +++ b/tests/ConsumerSeekTest.cc @@ -194,6 +194,12 @@ TEST_P(ConsumerSeekTest, testMultiTopicsSeekSingle) { } } +TEST_F(ConsumerSeekTest, testNoInternalConsumer) { + Consumer consumer; + ASSERT_EQ(ResultOk, client_.subscribeWithRegex("testNoInternalConsumer.*", "sub", consumer)); + ASSERT_EQ(ResultOperationNotSupported, consumer.seek(MessageId::earliest())); +} + INSTANTIATE_TEST_SUITE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true, false)); } // namespace pulsar From ef5efc90b52821d849b7dbf88e013284615203fb Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Mon, 3 Jun 2024 10:36:34 +0800 Subject: [PATCH 6/6] Revert the behavior change when there is no internal consumer --- lib/MultiTopicsConsumerImpl.cc | 5 ----- lib/MultiTopicsConsumerImpl.h | 3 +-- tests/ConsumerSeekTest.cc | 2 +- 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index 168af53a..e95a9ac4 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -897,11 +897,6 @@ void MultiTopicsConsumerImpl::afterSeek() { }); } -void MultiTopicsConsumerImpl::handleNoConsumersWhenSeek(const ResultCallback& callback) const { - LOG_ERROR(getName() << "There are no consumers when seek"); - callback(ResultOperationNotSupported); -} - void MultiTopicsConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) { if (msgId == MessageId::earliest() || msgId == MessageId::latest()) { return seekAllAsync(msgId, callback); diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h index 3a253f42..6763942f 100644 --- a/lib/MultiTopicsConsumerImpl.h +++ b/lib/MultiTopicsConsumerImpl.h @@ -185,7 +185,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { void beforeSeek(); void afterSeek(); - void handleNoConsumersWhenSeek(const ResultCallback& callback) const; FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery); FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery); @@ -229,7 +228,7 @@ template } }); }, - [this, callback] { handleNoConsumersWhenSeek(callback); }); + [callback] { callback(ResultOk); }); } } // namespace pulsar diff --git a/tests/ConsumerSeekTest.cc b/tests/ConsumerSeekTest.cc index c89f4ab0..f03ea5e3 100644 --- a/tests/ConsumerSeekTest.cc +++ b/tests/ConsumerSeekTest.cc @@ -197,7 +197,7 @@ TEST_P(ConsumerSeekTest, testMultiTopicsSeekSingle) { TEST_F(ConsumerSeekTest, testNoInternalConsumer) { Consumer consumer; ASSERT_EQ(ResultOk, client_.subscribeWithRegex("testNoInternalConsumer.*", "sub", consumer)); - ASSERT_EQ(ResultOperationNotSupported, consumer.seek(MessageId::earliest())); + ASSERT_EQ(ResultOk, consumer.seek(MessageId::earliest())); } INSTANTIATE_TEST_SUITE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true, false));