Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cpp producer sequence id changes #763

Merged
merged 4 commits into from
Sep 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions pulsar-client-cpp/include/pulsar/MessageBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,24 @@ class MessageBuilder {
*/
MessageBuilder& setEventTimestamp(uint64_t eventTimestamp);

/**
* Specify a custom sequence id for the message being published.
* <p>
* The sequence id can be used for deduplication purposes and it needs to follow these rules:
* <ol>
* <li><code>sequenceId >= 0</code>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assertion for this - also why not use uint64_t

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added check. Keeping int64_t for consistency with initialSequence id which can be negative.

* <li>Sequence id for a message needs to be greater than sequence id for earlier messages:
* <code>sequenceId(N+1) > sequenceId(N)</code>
* <li>It's not necessary for sequence ids to be consecutive. There can be holes between messages. Eg. the
* <code>sequenceId</code> could represent an offset or a cumulative size.
* </ol>
*
* @param sequenceId
* the sequence id to assign to the current message
* @since 1.20.0
*/
MessageBuilder& setSequenceId(int64_t sequenceId);

/**
* override namespace replication clusters. note that it is the
* caller's responsibility to provide valid cluster names, and that
Expand Down
18 changes: 18 additions & 0 deletions pulsar-client-cpp/include/pulsar/Producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ class Producer {
*/
const std::string& getTopic() const;

/**
* @return the producer name which could have been assigned by the system or specified by the client
*/
const std::string& getProducerName() const;

/**
* Publish a message on the topic associated with this Producer.
*
Expand Down Expand Up @@ -75,6 +80,19 @@ class Producer {
*/
void sendAsync(const Message& msg, SendCallback callback);

/**
* Get the last sequence id that was published by this producer.
*
* This represent either the automatically assigned or custom sequence id (set on the MessageBuilder) that
* was published and acknowledged by the broker.
*
* After recreating a producer with the same producer name, this will return the last message that was published in
* the previous producer session, or -1 if there no message was ever published.
*
* @return the last sequence id published by this producer
*/
int64_t getLastSequenceId() const;

/**
* Close the producer and release resources allocated.
*
Expand Down
6 changes: 6 additions & 0 deletions pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,15 @@ class ProducerConfiguration {
ProducerConfiguration(const ProducerConfiguration&);
ProducerConfiguration& operator=(const ProducerConfiguration&);

ProducerConfiguration& setProducerName(const std::string& producerName);
const std::string& getProducerName() const;

ProducerConfiguration& setSendTimeout(int sendTimeoutMs);
int getSendTimeout() const;

ProducerConfiguration& setInitialSequenceId(int64_t initialSequenceId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uint64_t

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be -1 (default value)

int64_t getInitialSequenceId() const;

ProducerConfiguration& setCompressionType(CompressionType compressionType);
CompressionType getCompressionType() const;

Expand Down
9 changes: 5 additions & 4 deletions pulsar-client-cpp/lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ void ClientConnection::handleIncomingCommand() {
pendingRequests_.erase(it);
lock.unlock();

requestData.promise.setValue("");
requestData.promise.setValue({"", -1});
requestData.timer->cancel();
}
break;
Expand Down Expand Up @@ -825,7 +825,8 @@ void ClientConnection::handleIncomingCommand() {
pendingRequests_.erase(it);
lock.unlock();

requestData.promise.setValue(producerSuccess.producer_name());
requestData.promise.setValue(
{ producerSuccess.producer_name(), producerSuccess.last_sequence_id() });
requestData.timer->cancel();
}
break;
Expand Down Expand Up @@ -1061,12 +1062,12 @@ void ClientConnection::sendPendingCommands() {
}
}

Future<Result, std::string> ClientConnection::sendRequestWithId(SharedBuffer cmd, int requestId) {
Future<Result, ResponseData> ClientConnection::sendRequestWithId(SharedBuffer cmd, int requestId) {
Lock lock(mutex_);

if (isClosed()) {
lock.unlock();
Promise<Result, std::string> promise;
Promise<Result, ResponseData> promise;
promise.setFailed(ResultNotConnected);
return promise.getFuture();
}
Expand Down
6 changes: 4 additions & 2 deletions pulsar-client-cpp/lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class LookupDataResult;

struct OpSendMsg;

typedef std::pair<std::string, int64_t> ResponseData;

class ClientConnection : public boost::enable_shared_from_this<ClientConnection> {
enum State {
Pending,
Expand Down Expand Up @@ -125,7 +127,7 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection>
* Send a request with a specific Id over the connection. The future will be
* triggered when the response for this request is received
*/
Future<Result, std::string> sendRequestWithId(SharedBuffer cmd, int requestId);
Future<Result, ResponseData> sendRequestWithId(SharedBuffer cmd, int requestId);

const std::string& brokerAddress() const;

Expand All @@ -138,7 +140,7 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection>
Future<Result, BrokerConsumerStatsImpl> newConsumerStats(uint64_t consumerId, uint64_t requestId) ;
private:
struct PendingRequestData {
Promise<Result, std::string> promise;
Promise<Result, ResponseData> promise;
DeadlineTimerPtr timer;
};

Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
// Lock is no longer required
lock.unlock();
int requestId = client->newRequestId();
Future<Result, std::string> future = cnx->sendRequestWithId(
Future<Result, ResponseData> future = cnx->sendRequestWithId(
Commands::newCloseConsumer(consumerId_, requestId), requestId);
if (!callback.empty()) {
future.addListener(
Expand Down
9 changes: 9 additions & 0 deletions pulsar-client-cpp/lib/MessageBuilder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ MessageBuilder& MessageBuilder::setEventTimestamp(uint64_t eventTimestamp) {
return *this;
}

MessageBuilder& MessageBuilder::setSequenceId(int64_t sequenceId) {
if (sequenceId < 0) {
throw "sequenceId needs to be >= 0";
}
checkMetadata();
impl_->metadata.set_sequence_id(sequenceId);
return *this;
}

MessageBuilder& MessageBuilder::setReplicationClusters(const std::vector<std::string>& clusters) {
checkMetadata();
google::protobuf::RepeatedPtrField<std::string> r(clusters.begin(), clusters.end());
Expand Down
13 changes: 13 additions & 0 deletions pulsar-client-cpp/lib/PartitionedProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,19 @@ namespace pulsar {
lock.unlock();
}

const std::string& PartitionedProducerImpl::getProducerName() const {
return producers_[0]->getProducerName();
}

int64_t PartitionedProducerImpl::getLastSequenceId() const {
int64_t currentMax = -1L;
for (int i = 0; i < producers_.size(); i++) {
currentMax = std::max(currentMax, producers_[i]->getLastSequenceId());
}

return currentMax;
}

/*
* if createProducerCallback is set, it means the closeAsync is called from CreateProducer API which failed to create
* one or many producers for partitions. So, we have to notify with ERROR on createProducerFailure
Expand Down
4 changes: 4 additions & 0 deletions pulsar-client-cpp/lib/PartitionedProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ namespace pulsar {
*/
virtual void closeAsync(CloseCallback closeCallback);

virtual const std::string& getProducerName() const;

virtual int64_t getLastSequenceId() const;

virtual void start();

virtual void shutdown();
Expand Down
8 changes: 8 additions & 0 deletions pulsar-client-cpp/lib/Producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ void Producer::sendAsync(const Message& msg, SendCallback callback) {
impl_->sendAsync(msg, callback);
}

const std::string& Producer::getProducerName() const {
return impl_->getProducerName();
}

int64_t Producer::getLastSequenceId() const {
return impl_->getLastSequenceId();
}

Result Producer::close() {
Promise<bool, Result> promise;
closeAsync(WaitForCallback(promise));
Expand Down
19 changes: 19 additions & 0 deletions pulsar-client-cpp/lib/ProducerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,25 @@ ProducerConfiguration& ProducerConfiguration::operator=(const ProducerConfigurat
return *this;
}

ProducerConfiguration& ProducerConfiguration::setProducerName(const std::string& producerName) {
impl_->producerName = Optional<std::string>::of(producerName);
return *this;
}

const std::string& ProducerConfiguration::getProducerName() const {
static const std::string emptyString;
return impl_->producerName.is_present() ? impl_->producerName.value() : emptyString;
}

ProducerConfiguration& ProducerConfiguration::setInitialSequenceId(int64_t initialSequenceId) {
impl_->initialSequenceId = Optional<int64_t>::of(initialSequenceId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disallow negative value?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noticed your earlier comment. Should we allow any negative in that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, initial can also be -1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring to someone setting it to -10 for example. Do we have to reject any value less than -1? Ot it doesn't matter for us?

return *this;
}

int64_t ProducerConfiguration::getInitialSequenceId() const {
return impl_->initialSequenceId.is_present() ? impl_->initialSequenceId.value() : -1ll;
}

ProducerConfiguration& ProducerConfiguration::setSendTimeout(int sendTimeoutMs) {
impl_->sendTimeoutMs = sendTimeoutMs;
return *this;
Expand Down
4 changes: 4 additions & 0 deletions pulsar-client-cpp/lib/ProducerConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@
#include <pulsar/ProducerConfiguration.h>
#include <boost/make_shared.hpp>

#include "Utils.h"

namespace pulsar {

struct ProducerConfigurationImpl {
Optional<std::string> producerName;
Optional<int64_t> initialSequenceId;
int sendTimeoutMs;
CompressionType compressionType;
int maxPendingMessages;
Expand Down
36 changes: 33 additions & 3 deletions pulsar-client-cpp/lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,18 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic,
conf_(conf),
executor_(client->getIOExecutorProvider()->get()),
pendingMessagesQueue_(conf_.getMaxPendingMessages()),
producerName_(conf_.getProducerName()),
producerStr_("[" + topic_ + ", " + producerName_ + "] "),
producerId_(client->newProducerId()),
msgSequenceGenerator_(0),
sendTimer_() {
LOG_DEBUG(
"ProducerName - " << producerName_ << " Created producer on topic " << topic_ << " id: " << producerId_);

int64_t initialSequenceId = conf.getInitialSequenceId();
lastSequenceIdPublished_ = initialSequenceId;
msgSequenceGenerator_ = initialSequenceId + 1;

// boost::ref is used to drop the constantness constraint of make_shared
if (conf_.getBatchingEnabled()) {
batchMessageContainer = boost::make_shared<BatchMessageContainer>(boost::ref(*this));
Expand All @@ -86,6 +92,14 @@ const std::string& ProducerImpl::getTopic() const {
return topic_;
}

const std::string& ProducerImpl::getProducerName() const {
return producerName_;
}

int64_t ProducerImpl::getLastSequenceId() const {
return lastSequenceIdPublished_;
}

void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
Lock lock(mutex_);
if (state_ == Closed) {
Expand Down Expand Up @@ -114,12 +128,14 @@ void ProducerImpl::connectionFailed(Result result) {
}

void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
const std::string& producerName) {
const ResponseData& responseData) {
LOG_DEBUG(getName() << "ProducerImpl::handleCreateProducer res: " << strResult(result));

if (result == ResultOk) {
// We are now reconnected to broker and clear to send messages. Re-send all pending messages and
// set the cnx pointer so that new messages will be sent immediately
const std::string& producerName = responseData.first;
int64_t lastSequenceId = responseData.second;
LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString());

Lock lock(mutex_);
Expand All @@ -129,6 +145,11 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
if (batchMessageContainer) {
batchMessageContainer->producerName_ = producerName_;
}

if (lastSequenceIdPublished_ == -1 && conf_.getInitialSequenceId() == -1) {
lastSequenceIdPublished_ = lastSequenceId;
msgSequenceGenerator_ = lastSequenceIdPublished_ + 1;
}
resendMessages(cnx);
connection_ = cnx;
state_ = Ready;
Expand Down Expand Up @@ -297,7 +318,13 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
return;
}

setMessageMetadata(msg, msgSequenceGenerator_++, uncompressedSize);
int64_t sequenceId;
if (!msg.impl_->metadata.has_sequence_id()) {
sequenceId = msgSequenceGenerator_++;
} else {
sequenceId = msg.impl_->metadata.sequence_id();
}
setMessageMetadata(msg, sequenceId, uncompressedSize);

// reserving a spot and going forward - not blocking
if (!conf_.getBlockIfQueueFull() && !pendingMessagesQueue_.tryReserve(1)) {
Expand Down Expand Up @@ -402,7 +429,7 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
return;
}
int requestId = client->newRequestId();
Future<Result, std::string> future = cnx->sendRequestWithId(
Future<Result, ResponseData> future = cnx->sendRequestWithId(
Commands::newCloseProducer(producerId_, requestId), requestId);
if (!callback.empty()) {
future.addListener(
Expand Down Expand Up @@ -539,6 +566,9 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId) {
// -1 since the pushing batch message into the queue already released a spot
pendingMessagesQueue_.release(op.msg_.impl_->metadata.num_messages_in_batch() - 1);
}

lastSequenceIdPublished_ = sequenceId + op.msg_.impl_->metadata.num_messages_in_batch() - 1;

lock.unlock();
if (op.sendCallback_) {
try {
Expand Down
10 changes: 8 additions & 2 deletions pulsar-client-cpp/lib/ProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ class ProducerImpl : public HandlerBase, public boost::enable_shared_from_this<P

virtual void disconnectProducer();

const std::string& getProducerName() const;

int64_t getLastSequenceId() const;

uint64_t getProducerId() const;

virtual void start();
Expand Down Expand Up @@ -111,7 +115,7 @@ class ProducerImpl : public HandlerBase, public boost::enable_shared_from_this<P
void printStats();

void handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
const std::string& producerName);
const ResponseData& responseData);

void statsCallBackHandler(Result , const Message& , SendCallback , boost::posix_time::ptime );

Expand All @@ -130,10 +134,12 @@ class ProducerImpl : public HandlerBase, public boost::enable_shared_from_this<P
std::string producerName_;
std::string producerStr_;
uint64_t producerId_;
uint64_t msgSequenceGenerator_;
int64_t msgSequenceGenerator_;
proto::BaseCommand cmd_;
BatchMessageContainerPtr batchMessageContainer;

volatile int64_t lastSequenceIdPublished_;

typedef boost::shared_ptr<boost::asio::deadline_timer> TimerPtr;
TimerPtr sendTimer_;
void handleSendTimeout(const boost::system::error_code& err);
Expand Down
4 changes: 4 additions & 0 deletions pulsar-client-cpp/lib/ProducerImplBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ class ProducerImplBase {
virtual ~ProducerImplBase(){
}

virtual const std::string& getProducerName() const = 0;

virtual int64_t getLastSequenceId() const = 0;

virtual void sendAsync(const Message& msg, SendCallback callback) = 0;
virtual void closeAsync(CloseCallback callback) = 0;
virtual void start() = 0;
Expand Down
Loading