Skip to content

Commit

Permalink
[consumer] Support parse broker metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhangJian He committed May 30, 2023
1 parent 4209987 commit e6a57ee
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 2 deletions.
27 changes: 25 additions & 2 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -679,11 +679,32 @@ void ClientConnection::processIncomingBuffer() {
if (incomingCmd.type() == BaseCommand::MESSAGE) {
// Parse message metadata and extract payload
proto::MessageMetadata msgMetadata;
proto::BrokerEntryMetadata brokerEntryMetadata;

// read checksum
uint32_t remainingBytes = frameSize - (cmdSize + 4);
bool isChecksumValid = verifyChecksum(incomingBuffer_, remainingBytes, incomingCmd);

auto readerIndex = incomingBuffer_.readerIndex();
if (incomingBuffer_.readUnsignedShort() == Commands::magicBrokerEntryMetadata) {
// broker entry metadata is present
uint32_t brokerEntryMetadataSize = incomingBuffer_.readUnsignedInt();
if (!brokerEntryMetadata.ParseFromArray(incomingBuffer_.data(), brokerEntryMetadataSize)) {
LOG_ERROR(cnxString_ << "[consumer id " << incomingCmd.message().consumer_id() //
<< ", message ledger id "
<< incomingCmd.message().message_id().ledgerid() //
<< ", entry id " << incomingCmd.message().message_id().entryid()
<< "] Error parsing broker entry metadata");
close();
return;
}

incomingBuffer_.consume(brokerEntryMetadataSize);
remainingBytes -= (2 + 4 + brokerEntryMetadataSize);
} else {
incomingBuffer_.setReaderIndex(readerIndex);
}

uint32_t metadataSize = incomingBuffer_.readUnsignedInt();
if (!msgMetadata.ParseFromArray(incomingBuffer_.data(), metadataSize)) {
LOG_ERROR(cnxString_ << "[consumer id " << incomingCmd.message().consumer_id() //
Expand All @@ -701,7 +722,8 @@ void ClientConnection::processIncomingBuffer() {
uint32_t payloadSize = remainingBytes;
SharedBuffer payload = SharedBuffer::copy(incomingBuffer_.data(), payloadSize);
incomingBuffer_.consume(payloadSize);
handleIncomingMessage(incomingCmd.message(), isChecksumValid, msgMetadata, payload);
handleIncomingMessage(incomingCmd.message(), isChecksumValid, brokerEntryMetadata, msgMetadata,
payload);
} else {
handleIncomingCommand(incomingCmd);
}
Expand All @@ -710,7 +732,7 @@ void ClientConnection::processIncomingBuffer() {
// We still have 1 to 3 bytes from the next frame
assert(incomingBuffer_.readableBytes() < sizeof(uint32_t));

// Restart with a new buffer and copy the the few bytes at the beginning
// Restart with a new buffer and copy the few bytes at the beginning
incomingBuffer_ = SharedBuffer::copyFrom(incomingBuffer_, DefaultBufferSize);

// At least we need to read 4 bytes to have the complete frame size
Expand Down Expand Up @@ -782,6 +804,7 @@ void ClientConnection::handleActiveConsumerChange(const proto::CommandActiveCons
}

void ClientConnection::handleIncomingMessage(const proto::CommandMessage& msg, bool isChecksumValid,
proto::BrokerEntryMetadata brokerEntryMetadata,
proto::MessageMetadata& msgMetadata, SharedBuffer& payload) {
LOG_DEBUG(cnxString_ << "Received a message from the server for consumer: " << msg.consumer_id());

Expand Down
2 changes: 2 additions & 0 deletions lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ struct OpSendMsg;

namespace proto {
class BaseCommand;
class BrokerEntryMetadata;
class CommandActiveConsumerChange;
class CommandAckResponse;
class CommandMessage;
Expand Down Expand Up @@ -225,6 +226,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
void handleActiveConsumerChange(const proto::CommandActiveConsumerChange& change);
void handleIncomingCommand(proto::BaseCommand& incomingCmd);
void handleIncomingMessage(const proto::CommandMessage& msg, bool isChecksumValid,
proto::BrokerEntryMetadata& brokerEntryMetadata,
proto::MessageMetadata& msgMetadata, SharedBuffer& payload);

void handlePulsarConnected(const proto::CommandConnected& cmdConnected);
Expand Down
3 changes: 3 additions & 0 deletions lib/Commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ class Commands {
};

const static uint16_t magicCrc32c = 0x0e01;

const static uint16_t magicBrokerEntryMetadata = 0x0e02;

const static int checksumSize = 4;

static SharedBuffer newConnect(const AuthenticationPtr& authentication, const std::string& logicalAddress,
Expand Down
1 change: 1 addition & 0 deletions lib/ProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class TopicName;
struct OpSendMsg;

namespace proto {
class BrokerEntryMetadata;
class MessageMetadata;
} // namespace proto

Expand Down

0 comments on commit e6a57ee

Please sign in to comment.