diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 6d18780f..b43143a9 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -519,18 +519,18 @@ void ClientConnection::handleHandshake(const boost::system::error_code& err) { return; } // Send CONNECT command to broker - auto weakSelf = weak_from_this(); + auto self = shared_from_this(); asyncWrite(buffer.const_asio_buffer(), - customAllocWriteHandler([weakSelf, buffer](const boost::system::error_code& err, size_t) { - auto self = weakSelf.lock(); - if (self) { - self->handleSentPulsarConnect(err, buffer); - } + customAllocWriteHandler([this, self, buffer](const boost::system::error_code& err, size_t) { + handleSentPulsarConnect(err, buffer); })); } void ClientConnection::handleSentPulsarConnect(const boost::system::error_code& err, const SharedBuffer& buffer) { + if (isClosed()) { + return; + } if (err) { LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message()); close(); @@ -543,6 +543,9 @@ void ClientConnection::handleSentPulsarConnect(const boost::system::error_code& void ClientConnection::handleSentAuthResponse(const boost::system::error_code& err, const SharedBuffer& buffer) { + if (isClosed()) { + return; + } if (err) { LOG_WARN(cnxString_ << "Failed to send auth response: " << err.message()); close(); @@ -650,6 +653,9 @@ void ClientConnection::readNextCommand() { void ClientConnection::handleRead(const boost::system::error_code& err, size_t bytesTransferred, uint32_t minReadSize) { + if (isClosed()) { + return; + } // Update buffer write idx with new data incomingBuffer_.bytesWritten(bytesTransferred); @@ -1085,15 +1091,10 @@ void ClientConnection::sendCommand(const SharedBuffer& cmd) { } void ClientConnection::sendCommandInternal(const SharedBuffer& cmd) { - auto weakSelf = weak_from_this(); + auto self = shared_from_this(); asyncWrite(cmd.const_asio_buffer(), - customAllocWriteHandler( - [weakSelf, cmd](const boost::system::error_code& err, size_t bytesTransferred) { - auto self = weakSelf.lock(); - if (self) { - self->handleSend(err, cmd); - } - })); + customAllocWriteHandler([this, self, cmd](const boost::system::error_code& err, + size_t bytesTransferred) { handleSend(err, cmd); })); } void ClientConnection::sendMessage(const std::shared_ptr& args) { @@ -1102,23 +1103,15 @@ void ClientConnection::sendMessage(const std::shared_ptr& args) { pendingWriteBuffers_.emplace_back(args); return; } - auto weakSelf = weak_from_this(); - auto sendMessageInternal = [this, weakSelf, args] { - auto self = weakSelf.lock(); - if (!self) { - return; - } + auto self = shared_from_this(); + auto sendMessageInternal = [this, self, args] { BaseCommand outgoingCmd; auto buffer = Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args); // Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before the // callback is called, an invalid buffer range might be passed to the underlying socket send. - asyncWrite(buffer, customAllocWriteHandler([weakSelf, buffer](const boost::system::error_code& err, - size_t bytesTransferred) { - auto self = weakSelf.lock(); - if (self) { - self->handleSendPair(err); - } - })); + asyncWrite(buffer, customAllocWriteHandler( + [this, self, buffer](const boost::system::error_code& err, + size_t bytesTransferred) { handleSendPair(err); })); }; if (tlsSocket_) { #if BOOST_VERSION >= 106600 @@ -1132,6 +1125,9 @@ void ClientConnection::sendMessage(const std::shared_ptr& args) { } void ClientConnection::handleSend(const boost::system::error_code& err, const SharedBuffer&) { + if (isClosed()) { + return; + } if (err) { LOG_WARN(cnxString_ << "Could not send message on connection: " << err << " " << err.message()); close(ResultDisconnected); @@ -1141,6 +1137,9 @@ void ClientConnection::handleSend(const boost::system::error_code& err, const Sh } void ClientConnection::handleSendPair(const boost::system::error_code& err) { + if (isClosed()) { + return; + } if (err) { LOG_WARN(cnxString_ << "Could not send pair message on connection: " << err << " " << err.message()); close(ResultDisconnected); @@ -1157,17 +1156,12 @@ void ClientConnection::sendPendingCommands() { boost::any any = pendingWriteBuffers_.front(); pendingWriteBuffers_.pop_front(); - auto weakSelf = weak_from_this(); + auto self = shared_from_this(); if (any.type() == typeid(SharedBuffer)) { SharedBuffer buffer = boost::any_cast(any); - asyncWrite( - buffer.const_asio_buffer(), - customAllocWriteHandler([weakSelf, buffer](const boost::system::error_code& err, size_t) { - auto self = weakSelf.lock(); - if (self) { - self->handleSend(err, buffer); - } - })); + asyncWrite(buffer.const_asio_buffer(), + customAllocWriteHandler([this, self, buffer](const boost::system::error_code& err, + size_t) { handleSend(err, buffer); })); } else { assert(any.type() == typeid(std::shared_ptr)); @@ -1178,13 +1172,9 @@ void ClientConnection::sendPendingCommands() { // Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before the // callback is called, an invalid buffer range might be passed to the underlying socket send. - asyncWrite(buffer, customAllocWriteHandler( - [weakSelf, buffer](const boost::system::error_code& err, size_t) { - auto self = weakSelf.lock(); - if (self) { - self->handleSendPair(err); - } - })); + asyncWrite(buffer, + customAllocWriteHandler([this, self, buffer](const boost::system::error_code& err, + size_t) { handleSendPair(err); })); } } else { // No more pending writes @@ -1334,10 +1324,11 @@ void ClientConnection::close(Result result, bool detach) { } lock.unlock(); + int refCount = weak_from_this().use_count(); if (!isResultRetryable(result)) { - LOG_ERROR(cnxString_ << "Connection closed with " << result); + LOG_ERROR(cnxString_ << "Connection closed with " << result << " (refCnt: " << refCount << ")"); } else { - LOG_INFO(cnxString_ << "Connection disconnected"); + LOG_INFO(cnxString_ << "Connection disconnected (refCnt: " << refCount << ")"); } // Remove the connection from the pool before completing any promise if (detach) { @@ -1824,13 +1815,10 @@ void ClientConnection::handleAuthChallenge() { close(result); return; } - auto weakSelf = weak_from_this(); + auto self = shared_from_this(); asyncWrite(buffer.const_asio_buffer(), - customAllocWriteHandler([weakSelf, buffer](const boost::system::error_code& err, size_t) { - auto self = weakSelf.lock(); - if (self) { - self->handleSentAuthResponse(err, buffer); - } + customAllocWriteHandler([this, self, buffer](const boost::system::error_code& err, size_t) { + handleSentAuthResponse(err, buffer); })); } diff --git a/run-unit-tests.sh b/run-unit-tests.sh index 8ea834cf..693267ff 100755 --- a/run-unit-tests.sh +++ b/run-unit-tests.sh @@ -40,10 +40,17 @@ docker compose -f tests/oauth2/docker-compose.yml down # Run BrokerMetadata tests docker compose -f tests/brokermetadata/docker-compose.yml up -d -sleep 15 +until curl http://localhost:8080/metrics > /dev/null 2>&1 ; do sleep 1; done +sleep 5 $CMAKE_BUILD_DIRECTORY/tests/BrokerMetadataTest docker compose -f tests/brokermetadata/docker-compose.yml down +docker compose -f tests/chunkdedup/docker-compose.yml up -d +until curl http://localhost:8080/metrics > /dev/null 2>&1 ; do sleep 1; done +sleep 5 +$CMAKE_BUILD_DIRECTORY/tests/ChunkDedupTest --gtest_repeat=10 +docker compose -f tests/chunkdedup/docker-compose.yml down + ./pulsar-test-service-start.sh pushd $CMAKE_BUILD_DIRECTORY/tests diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 6b464251..b3731964 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -71,3 +71,6 @@ target_link_libraries(BrokerMetadataTest ${CLIENT_LIBS} pulsarStatic ${GTEST_LIB add_executable(Oauth2Test oauth2/Oauth2Test.cc) target_compile_options(Oauth2Test PRIVATE "-DTEST_ROOT_PATH=\"${CMAKE_CURRENT_SOURCE_DIR}\"") target_link_libraries(Oauth2Test ${CLIENT_LIBS} pulsarStatic ${GTEST_LIBRARY_PATH}) + +add_executable(ChunkDedupTest chunkdedup/ChunkDedupTest.cc) +target_link_libraries(ChunkDedupTest ${CLIENT_LIBS} pulsarStatic ${GTEST_LIBRARY_PATH}) diff --git a/tests/chunkdedup/ChunkDedupTest.cc b/tests/chunkdedup/ChunkDedupTest.cc new file mode 100644 index 00000000..609511f2 --- /dev/null +++ b/tests/chunkdedup/ChunkDedupTest.cc @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include + +#include "lib/Latch.h" +#include "lib/LogUtils.h" + +DECLARE_LOG_OBJECT() + +using namespace pulsar; + +// Before https://github.com/apache/pulsar/pull/20948, when message deduplication is enabled, sending chunks +// to the broker will receive send error response. +TEST(ChunkDedupTest, testSendChunks) { + Client client{"pulsar://localhost:6650"}; + ProducerConfiguration conf; + conf.setBatchingEnabled(false); + conf.setChunkingEnabled(true); + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer("test-send-chunks", conf, producer)); + + Latch latch{1}; + std::string value(1024000 /* max message size */ * 100, 'a'); + producer.sendAsync(MessageBuilder().setContent(value).build(), + [&latch](Result result, const MessageId& msgId) { + LOG_INFO("Send to " << msgId << ": " << result); + latch.countdown(); + }); + ASSERT_TRUE(latch.wait(std::chrono::seconds(10))); + client.close(); +} + +int main(int argc, char* argv[]) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tests/chunkdedup/docker-compose.yml b/tests/chunkdedup/docker-compose.yml new file mode 100644 index 00000000..6aed8c47 --- /dev/null +++ b/tests/chunkdedup/docker-compose.yml @@ -0,0 +1,45 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +version: '3' +networks: + pulsar: + driver: bridge +services: + standalone: + # Don't change the version here to ensure https://github.com/apache/pulsar/pull/20948 is not included + image: apachepulsar/pulsar:3.1.0 + container_name: standalone + hostname: local + restart: "no" + networks: + - pulsar + environment: + - metadataStoreUrl=zk:localhost:2181 + - clusterName=standalone + - advertisedAddress=localhost + - advertisedListeners=external:pulsar://localhost:6650 + - PULSAR_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m + - PULSAR_PREFIX_maxMessageSize=1024000 + - PULSAR_PREFIX_brokerDeduplicationEnabled=true + ports: + - "6650:6650" + - "8080:8080" + command: bash -c "bin/apply-config-from-env.py conf/standalone.conf && exec bin/pulsar standalone -nss -nfw" +