Skip to content

Commit

Permalink
[consumer] Support parse broker metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhangJian He committed May 30, 2023
1 parent 4209987 commit 4969de7
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 2 deletions.
27 changes: 25 additions & 2 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -679,11 +679,32 @@ void ClientConnection::processIncomingBuffer() {
if (incomingCmd.type() == BaseCommand::MESSAGE) {
// Parse message metadata and extract payload
proto::MessageMetadata msgMetadata;
proto::BrokerEntryMetadata brokerEntryMetadata;

// read checksum
uint32_t remainingBytes = frameSize - (cmdSize + 4);
bool isChecksumValid = verifyChecksum(incomingBuffer_, remainingBytes, incomingCmd);

auto readerIndex = incomingBuffer_.readerIndex();
if (incomingBuffer_.readUnsignedShort() == Commands::magicBrokerEntryMetadata) {
// broker entry metadata is present
uint32_t brokerEntryMetadataSize = incomingBuffer_.readUnsignedInt();
if (!brokerEntryMetadata.ParseFromArray(incomingBuffer_.data(), brokerEntryMetadataSize)) {
LOG_ERROR(cnxString_ << "[consumer id " << incomingCmd.message().consumer_id()
<< ", message ledger id "
<< incomingCmd.message().message_id().ledgerid() << ", entry id "
<< incomingCmd.message().message_id().entryid()
<< "] Error parsing broker entry metadata");
close();
return;
}

incomingBuffer_.consume(brokerEntryMetadataSize);
remainingBytes -= (2 + 4 + brokerEntryMetadataSize);
} else {
incomingBuffer_.setReaderIndex(readerIndex);
}

uint32_t metadataSize = incomingBuffer_.readUnsignedInt();
if (!msgMetadata.ParseFromArray(incomingBuffer_.data(), metadataSize)) {
LOG_ERROR(cnxString_ << "[consumer id " << incomingCmd.message().consumer_id() //
Expand All @@ -701,7 +722,8 @@ void ClientConnection::processIncomingBuffer() {
uint32_t payloadSize = remainingBytes;
SharedBuffer payload = SharedBuffer::copy(incomingBuffer_.data(), payloadSize);
incomingBuffer_.consume(payloadSize);
handleIncomingMessage(incomingCmd.message(), isChecksumValid, msgMetadata, payload);
handleIncomingMessage(incomingCmd.message(), isChecksumValid, brokerEntryMetadata, msgMetadata,
payload);
} else {
handleIncomingCommand(incomingCmd);
}
Expand All @@ -710,7 +732,7 @@ void ClientConnection::processIncomingBuffer() {
// We still have 1 to 3 bytes from the next frame
assert(incomingBuffer_.readableBytes() < sizeof(uint32_t));

// Restart with a new buffer and copy the the few bytes at the beginning
// Restart with a new buffer and copy the few bytes at the beginning
incomingBuffer_ = SharedBuffer::copyFrom(incomingBuffer_, DefaultBufferSize);

// At least we need to read 4 bytes to have the complete frame size
Expand Down Expand Up @@ -782,6 +804,7 @@ void ClientConnection::handleActiveConsumerChange(const proto::CommandActiveCons
}

void ClientConnection::handleIncomingMessage(const proto::CommandMessage& msg, bool isChecksumValid,
proto::BrokerEntryMetadata& brokerEntryMetadata,
proto::MessageMetadata& msgMetadata, SharedBuffer& payload) {
LOG_DEBUG(cnxString_ << "Received a message from the server for consumer: " << msg.consumer_id());

Expand Down
2 changes: 2 additions & 0 deletions lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ struct OpSendMsg;

namespace proto {
class BaseCommand;
class BrokerEntryMetadata;
class CommandActiveConsumerChange;
class CommandAckResponse;
class CommandMessage;
Expand Down Expand Up @@ -225,6 +226,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
void handleActiveConsumerChange(const proto::CommandActiveConsumerChange& change);
void handleIncomingCommand(proto::BaseCommand& incomingCmd);
void handleIncomingMessage(const proto::CommandMessage& msg, bool isChecksumValid,
proto::BrokerEntryMetadata& brokerEntryMetadata,
proto::MessageMetadata& msgMetadata, SharedBuffer& payload);

void handlePulsarConnected(const proto::CommandConnected& cmdConnected);
Expand Down
3 changes: 3 additions & 0 deletions lib/Commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ class Commands {
};

const static uint16_t magicCrc32c = 0x0e01;

const static uint16_t magicBrokerEntryMetadata = 0x0e02;

const static int checksumSize = 4;

static SharedBuffer newConnect(const AuthenticationPtr& authentication, const std::string& logicalAddress,
Expand Down
6 changes: 6 additions & 0 deletions run-unit-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ sleep 15
$CMAKE_BUILD_DIRECTORY/tests/Oauth2Test
docker compose -f tests/oauth2/docker-compose.yml down

# Run BrokerMetadata tests
docker compose -f tests/brokermetadata/docker-compose.yml up -d
sleep 15
$CMAKE_BUILD_DIRECTORY/tests/BrokerMetadataTest
docker compose -f tests/brokermetadata/docker-compose.yml down

./pulsar-test-service-start.sh

pushd $CMAKE_BUILD_DIRECTORY/tests
Expand Down
4 changes: 4 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ if (UNIX)
target_link_libraries(ConnectionFailTest ${CLIENT_LIBS} pulsarStatic ${GTEST_LIBRARY_PATH})
endif ()

add_executable(BrokerMetadataTest brokermetadata/BrokerMetadataTest.cc)
target_compile_options(BrokerMetadataTest PRIVATE "-DTEST_ROOT_PATH=\"${CMAKE_CURRENT_SOURCE_DIR}\"")
target_link_libraries(BrokerMetadataTest ${CLIENT_LIBS} pulsarStatic ${GTEST_LIBRARY_PATH})

add_executable(Oauth2Test oauth2/Oauth2Test.cc)
target_compile_options(Oauth2Test PRIVATE "-DTEST_ROOT_PATH=\"${CMAKE_CURRENT_SOURCE_DIR}\"")
target_link_libraries(Oauth2Test ${CLIENT_LIBS} pulsarStatic ${GTEST_LIBRARY_PATH})
52 changes: 52 additions & 0 deletions tests/brokermetadata/BrokerMetadataTest.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
// Run `docker-compose up -d` to set up the test environment for this test.
#include <gtest/gtest.h>
#include <pulsar/Client.h>

using namespace pulsar;

#ifndef TEST_ROOT_PATH
#define TEST_ROOT_PATH "."
#endif

TEST(BrokerMetadataTest, testConsumeSuccess) {
Client client{"pulsar://localhost:6650"};
Producer producer;
Result producerResult = client.createProducer("persistent://public/default/testConsumeSuccess", producer);
ASSERT_EQ(producerResult, ResultOk);
Consumer consumer;
Result consumerResult =
client.subscribe("persistent://public/default/testConsumeSuccess", "testConsumeSuccess", consumer);
ASSERT_EQ(consumerResult, ResultOk);
const auto msg = MessageBuilder().setContent("testConsumeSuccess").build();
Result sendResult = producer.send(msg);
ASSERT_EQ(sendResult, ResultOk);
Message receivedMsg;
Result receiveResult = consumer.receive(receivedMsg);
ASSERT_EQ(receiveResult, ResultOk);
ASSERT_EQ(receivedMsg.getDataAsString(), "testConsumeSuccess");
client.close();
}

int main(int argc, char* argv[]) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
return 0;
}
43 changes: 43 additions & 0 deletions tests/brokermetadata/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

version: '3'
networks:
pulsar:
driver: bridge
services:
standalone:
image: apachepulsar/pulsar:latest
container_name: standalone
hostname: local
restart: "no"
networks:
- pulsar
environment:
- metadataStoreUrl=zk:localhost:2181
- clusterName=standalone-broker-metadata
- advertisedAddress=localhost
- advertisedListeners=external:pulsar://localhost:6650
- PULSAR_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m
- PULSAR_PREFIX_BROKER_ENTRY_METADATA_INTERCEPTORS=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
- PULSAR_PREFIX_EXPOSING_BROKER_ENTRY_METADATA_TO_CLIENT_ENABLED=true
ports:
- "6650:6650"
- "8080:8080"
command: bash -c "bin/apply-config-from-env.py conf/standalone.conf && exec bin/pulsar standalone -nss -nfw"

0 comments on commit 4969de7

Please sign in to comment.