Skip to content

Commit

Permalink
[feat] Support partitioned topic reader.
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd committed Jan 19, 2023
1 parent 9ed6a45 commit d372f55
Show file tree
Hide file tree
Showing 11 changed files with 232 additions and 67 deletions.
11 changes: 3 additions & 8 deletions lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,10 @@ void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDat
return;
}

if (partitionMetadata->getPartitions() > 0) {
LOG_ERROR("Topic reader cannot be created on a partitioned topic: " << topicName->toString());
callback(ResultOperationNotSupported, Reader());
return;
}

ReaderImplPtr reader = std::make_shared<ReaderImpl>(shared_from_this(), topicName->toString(), conf,
ReaderImplPtr reader = std::make_shared<ReaderImpl>(shared_from_this(), topicName->toString(),
partitionMetadata->getPartitions(), conf,
getListenerExecutorProvider()->get(), callback);
ConsumerImplBasePtr consumer = reader->getConsumer().lock();
ConsumerImplBasePtr consumer = reader->getConsumer();
auto self = shared_from_this();
reader->start(startMessageId, [this, self](const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
auto consumer = weakConsumerPtr.lock();
Expand Down
2 changes: 1 addition & 1 deletion lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ class ConsumerImpl : public ConsumerImplBase {
void negativeAcknowledge(const MessageId& msgId) override;
bool isConnected() const override;
uint64_t getNumberOfConnectedConsumer() override;
void hasMessageAvailableAsync(HasMessageAvailableCallback callback) override;

virtual void disconnectConsumer();
Result fetchSingleMessageFromBroker(Message& msg);
Expand All @@ -133,7 +134,6 @@ class ConsumerImpl : public ConsumerImplBase {
virtual void redeliverMessages(const std::set<MessageId>& messageIds);

virtual bool isReadCompacted();
virtual void hasMessageAvailableAsync(HasMessageAvailableCallback callback);
void beforeConnectionChange(ClientConnection& cnx) override;

protected:
Expand Down
2 changes: 2 additions & 0 deletions lib/ConsumerImplBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "HandlerBase.h"

namespace pulsar {
typedef std::function<void(Result result, bool hasMessageAvailable)> HasMessageAvailableCallback;
class ConsumerImplBase;
using ConsumerImplBaseWeakPtr = std::weak_ptr<ConsumerImplBase>;
class OpBatchReceive {
Expand Down Expand Up @@ -76,6 +77,7 @@ class ConsumerImplBase : public HandlerBase, public std::enable_shared_from_this
virtual uint64_t getNumberOfConnectedConsumer() = 0;
// overrided methods from HandlerBase
virtual const std::string& getName() const override = 0;
virtual void hasMessageAvailableAsync(HasMessageAvailableCallback callback) = 0;

protected:
// overrided methods from HandlerBase
Expand Down
57 changes: 50 additions & 7 deletions lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,20 @@ using namespace pulsar;
MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName,
int numPartitions, const std::string& subscriptionName,
const ConsumerConfiguration& conf,
LookupServicePtr lookupServicePtr)
LookupServicePtr lookupServicePtr,
const Commands::SubscriptionMode subscriptionMode,
boost::optional<MessageId> startMessageId)
: MultiTopicsConsumerImpl(client, {topicName->toString()}, subscriptionName, topicName, conf,
lookupServicePtr) {
lookupServicePtr, subscriptionMode, startMessageId) {
topicsPartitions_[topicName->toString()] = numPartitions;
}

MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector<std::string>& topics,
const std::string& subscriptionName, TopicNamePtr topicName,
const ConsumerConfiguration& conf,
LookupServicePtr lookupServicePtr)
LookupServicePtr lookupServicePtr,
const Commands::SubscriptionMode subscriptionMode,
boost::optional<MessageId> startMessageId)
: ConsumerImplBase(client, topicName ? topicName->toString() : "EmptyTopics",
Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf,
client->getListenerExecutorProvider()->get()),
Expand All @@ -60,7 +64,9 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
messageListener_(conf.getMessageListener()),
lookupServicePtr_(lookupServicePtr),
numberTopicPartitions_(std::make_shared<std::atomic<int>>(0)),
topics_(topics) {
topics_(topics),
subscriptionMode_(subscriptionMode),
startMessageId_(startMessageId) {
std::stringstream consumerStrStream;
consumerStrStream << "[Muti Topics Consumer: "
<< "TopicName - " << topic_ << " - Subscription - " << subscriptionName << "]";
Expand Down Expand Up @@ -226,7 +232,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
// We don't have to add partition-n suffix
consumer = std::make_shared<ConsumerImpl>(client, topicName->toString(), subscriptionName_, config,
topicName->isPersistent(), internalListenerExecutor, true,
NonPartitioned);
NonPartitioned, subscriptionMode_, startMessageId_);
consumer->getConsumerCreatedFuture().addListener(std::bind(
&MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
Expand All @@ -239,7 +245,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
std::string topicPartitionName = topicName->getTopicPartitionName(i);
consumer = std::make_shared<ConsumerImpl>(client, topicPartitionName, subscriptionName_, config,
topicName->isPersistent(), internalListenerExecutor,
true, Partitioned);
true, Partitioned, subscriptionMode_, startMessageId_);
consumer->getConsumerCreatedFuture().addListener(std::bind(
&MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
Expand Down Expand Up @@ -686,7 +692,12 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdLis
}

void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) {
callback(ResultOperationNotSupported);
msgId.getTopicName();
auto optConsumer = consumers_.find(msgId.getTopicName());
if (optConsumer) {
unAckedMessageTrackerPtr_->removeMessagesTill(msgId);
optConsumer.value()->acknowledgeCumulativeAsync(msgId, callback);
}
}

void MultiTopicsConsumerImpl::negativeAcknowledge(const MessageId& msgId) {
Expand Down Expand Up @@ -1047,3 +1058,35 @@ void MultiTopicsConsumerImpl::cancelTimers() noexcept {
partitionsUpdateTimer_->cancel(ec);
}
}

void MultiTopicsConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback) {
if (incomingMessagesSize_ > 0) {
callback(ResultOk, true);
return;
}

auto hasMessageAvailable = std::make_shared<std::atomic<bool>>();
auto needCallBack = std::make_shared<std::atomic<int>>(consumers_.size());
auto self = get_shared_this_ptr();

consumers_.forEachValue([self, needCallBack, callback, hasMessageAvailable](ConsumerImplPtr consumer) {
consumer->hasMessageAvailableAsync(
[self, needCallBack, callback, hasMessageAvailable](Result result, bool hasMsg) {
if (result != ResultOk) {
LOG_ERROR("Filed when acknowledge list: " << result);
// set needCallBack is -1 to avoid repeated callback.
needCallBack->store(-1);
callback(result, false);
return;
}

if (hasMsg) {
hasMessageAvailable->store(hasMsg);
}

if (--(*needCallBack) == 0) {
callback(result, hasMessageAvailable->load() || self->incomingMessagesSize_ > 0);
}
});
});
}
14 changes: 12 additions & 2 deletions lib/MultiTopicsConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <vector>

#include "BlockingQueue.h"
#include "Commands.h"
#include "ConsumerImplBase.h"
#include "Future.h"
#include "Latch.h"
Expand Down Expand Up @@ -53,10 +54,15 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
public:
MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName, int numPartitions,
const std::string& subscriptionName, const ConsumerConfiguration& conf,
LookupServicePtr lookupServicePtr);
LookupServicePtr lookupServicePtr,
const Commands::SubscriptionMode = Commands::SubscriptionModeDurable,
boost::optional<MessageId> startMessageId = boost::none);

MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector<std::string>& topics,
const std::string& subscriptionName, TopicNamePtr topicName,
const ConsumerConfiguration& conf, LookupServicePtr lookupServicePtr_);
const ConsumerConfiguration& conf, LookupServicePtr lookupServicePtr_,
const Commands::SubscriptionMode = Commands::SubscriptionModeDurable,
boost::optional<MessageId> startMessageId = boost::none);

~MultiTopicsConsumerImpl();
// overrided methods from ConsumerImplBase
Expand Down Expand Up @@ -88,6 +94,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
void negativeAcknowledge(const MessageId& msgId) override;
bool isConnected() const override;
uint64_t getNumberOfConnectedConsumer() override;
void hasMessageAvailableAsync(HasMessageAvailableCallback callback) override;

void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, MultiTopicsBrokerConsumerStatsPtr,
size_t, BrokerConsumerStatsCallback);
Expand Down Expand Up @@ -118,6 +125,8 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
const std::vector<std::string> topics_;
std::queue<ReceiveCallback> pendingReceives_;
const Commands::SubscriptionMode subscriptionMode_;
boost::optional<MessageId> startMessageId_;

/* methods */
void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
Expand Down Expand Up @@ -167,6 +176,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {

FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition);
};

typedef std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImplPtr;
Expand Down
29 changes: 22 additions & 7 deletions lib/ReaderImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "ConsumerImpl.h"
#include "ExecutorService.h"
#include "GetLastMessageIdResponse.h"
#include "MultiTopicsConsumerImpl.h"
#include "TopicName.h"

namespace pulsar {
Expand All @@ -35,9 +36,14 @@ ConsumerConfiguration consumerConfigOfReader;

static ResultCallback emptyCallback;

ReaderImpl::ReaderImpl(const ClientImplPtr client, const std::string& topic, const ReaderConfiguration& conf,
const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback)
: topic_(topic), client_(client), readerConf_(conf), readerCreatedCallback_(readerCreatedCallback) {}
ReaderImpl::ReaderImpl(const ClientImplPtr client, const std::string& topic, int partitions,
const ReaderConfiguration& conf, const ExecutorServicePtr listenerExecutor,
ReaderCallback readerCreatedCallback)
: topic_(topic),
partitions_(partitions),
client_(client),
readerConf_(conf),
readerCreatedCallback_(readerCreatedCallback) {}

void ReaderImpl::start(const MessageId& startMessageId,
std::function<void(const ConsumerImplBaseWeakPtr&)> callback) {
Expand Down Expand Up @@ -80,10 +86,19 @@ void ReaderImpl::start(const MessageId& startMessageId,
test::consumerConfigOfReader = consumerConf.clone();
}

consumer_ = std::make_shared<ConsumerImpl>(
client_.lock(), topic_, subscription, consumerConf, TopicName::get(topic_)->isPersistent(),
ExecutorServicePtr(), false, NonPartitioned, Commands::SubscriptionModeNonDurable, startMessageId);
consumer_->setPartitionIndex(TopicName::getPartitionIndex(topic_));
if (partitions_ > 0) {
auto consumerImpl = std::make_shared<MultiTopicsConsumerImpl>(
client_.lock(), TopicName::get(topic_), partitions_, subscription, consumerConf,
client_.lock()->getLookup(), Commands::SubscriptionModeNonDurable, startMessageId);
consumer_ = consumerImpl;
} else {
auto consumerImpl = std::make_shared<ConsumerImpl>(
client_.lock(), topic_, subscription, consumerConf, TopicName::get(topic_)->isPersistent(),
ExecutorServicePtr(), false, NonPartitioned, Commands::SubscriptionModeNonDurable,
startMessageId);
consumerImpl->setPartitionIndex(TopicName::getPartitionIndex(topic_));
consumer_ = consumerImpl;
}
auto self = shared_from_this();
consumer_->getConsumerCreatedFuture().addListener(
[this, self, callback](Result result, const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
Expand Down
10 changes: 6 additions & 4 deletions lib/ReaderImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ extern PULSAR_PUBLIC ConsumerConfiguration consumerConfigOfReader;

class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl> {
public:
ReaderImpl(const ClientImplPtr client, const std::string& topic, const ReaderConfiguration& conf,
const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback);
ReaderImpl(const ClientImplPtr client, const std::string& topic, int partitions,
const ReaderConfiguration& conf, const ExecutorServicePtr listenerExecutor,
ReaderCallback readerCreatedCallback);

void start(const MessageId& startMessageId, std::function<void(const ConsumerImplBaseWeakPtr&)> callback);

Expand All @@ -73,7 +74,7 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>

Future<Result, ReaderImplWeakPtr> getReaderCreatedFuture();

ConsumerImplWeakPtr getConsumer() const noexcept { return consumer_; }
ConsumerImplBasePtr getConsumer() const noexcept { return consumer_; }

void hasMessageAvailableAsync(HasMessageAvailableCallback callback);

Expand All @@ -90,9 +91,10 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>
void acknowledgeIfNecessary(Result result, const Message& msg);

std::string topic_;
int partitions_;
ClientImplWeakPtr client_;
ReaderConfiguration readerConf_;
ConsumerImplPtr consumer_;
ConsumerImplBasePtr consumer_;
ReaderCallback readerCreatedCallback_;
ReaderListener readerListener_;
};
Expand Down
1 change: 1 addition & 0 deletions lib/UnAckedMessageTrackerEnabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testBatchUnAckedMessageTracker);
FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition);
};
} // namespace pulsar

Expand Down
47 changes: 47 additions & 0 deletions tests/ConsumerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,53 @@ TEST(ConsumerTest, testConsumerEventWithPartition) {
ASSERT_EQ(0, result.size());
}

TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition) {
Client client(lookupUrl);

const std::string topic = "testAcknowledgeCumulativeWithPartition-" + std::to_string(time(nullptr));
const std::string subName = "sub";

int res = makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + topic + "/partitions",
std::to_string(2));
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;

Consumer consumer;
ConsumerConfiguration consumerConfiguration;
consumerConfiguration.setUnAckedMessagesTimeoutMs(10000);
ASSERT_EQ(ResultOk, client.subscribe(topic, "t-sub", consumerConfiguration, consumer));

Producer producer;
ProducerConfiguration producerConfiguration;
producerConfiguration.setBatchingEnabled(false);
producerConfiguration.setPartitionsRoutingMode(
ProducerConfiguration::PartitionsRoutingMode::RoundRobinDistribution);
ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfiguration, producer));

const int numMessages = 100;
for (int i = 0; i < numMessages; ++i) {
Message msg = MessageBuilder().setContent(std::to_string(i)).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}

Message msg;
for (int i = 0; i < numMessages; i++) {
ASSERT_EQ(ResultOk, consumer.receive(msg));
// The last message of each partition topic be ACK
if (i >= numMessages - 2) {
consumer.acknowledgeCumulative(msg.getMessageId());
}
}
ASSERT_EQ(ResultTimeout, consumer.receive(msg, 2000));

// Assert that there is no message in the tracker.
auto multiConsumerImpl = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer);
auto tracker =
static_cast<UnAckedMessageTrackerEnabled*>(multiConsumerImpl->unAckedMessageTrackerPtr_.get());
ASSERT_EQ(0, tracker->size());

client.close();
}

TEST(ConsumerTest, consumerNotInitialized) {
Consumer consumer;

Expand Down
2 changes: 1 addition & 1 deletion tests/PulsarFriend.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class PulsarFriend {
}

static ConsumerImplPtr getConsumer(Reader reader) {
return std::static_pointer_cast<ConsumerImpl>(reader.impl_->getConsumer().lock());
return std::static_pointer_cast<ConsumerImpl>(reader.impl_->getConsumer());
}

static ReaderImplWeakPtr getReaderImplWeakPtr(Reader reader) { return reader.impl_; }
Expand Down
Loading

0 comments on commit d372f55

Please sign in to comment.