Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix segmentation fault when sending messages after receiving an error #326

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 39 additions & 51 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -519,18 +519,18 @@ void ClientConnection::handleHandshake(const boost::system::error_code& err) {
return;
}
// Send CONNECT command to broker
auto weakSelf = weak_from_this();
auto self = shared_from_this();
asyncWrite(buffer.const_asio_buffer(),
customAllocWriteHandler([weakSelf, buffer](const boost::system::error_code& err, size_t) {
auto self = weakSelf.lock();
if (self) {
self->handleSentPulsarConnect(err, buffer);
}
customAllocWriteHandler([this, self, buffer](const boost::system::error_code& err, size_t) {
handleSentPulsarConnect(err, buffer);
}));
}

void ClientConnection::handleSentPulsarConnect(const boost::system::error_code& err,
const SharedBuffer& buffer) {
if (isClosed()) {
return;
}
if (err) {
LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
close();
Expand All @@ -543,6 +543,9 @@ void ClientConnection::handleSentPulsarConnect(const boost::system::error_code&

void ClientConnection::handleSentAuthResponse(const boost::system::error_code& err,
const SharedBuffer& buffer) {
if (isClosed()) {
return;
}
if (err) {
LOG_WARN(cnxString_ << "Failed to send auth response: " << err.message());
close();
Expand Down Expand Up @@ -650,6 +653,9 @@ void ClientConnection::readNextCommand() {

void ClientConnection::handleRead(const boost::system::error_code& err, size_t bytesTransferred,
uint32_t minReadSize) {
if (isClosed()) {
return;
}
// Update buffer write idx with new data
incomingBuffer_.bytesWritten(bytesTransferred);

Expand Down Expand Up @@ -1085,15 +1091,10 @@ void ClientConnection::sendCommand(const SharedBuffer& cmd) {
}

void ClientConnection::sendCommandInternal(const SharedBuffer& cmd) {
auto weakSelf = weak_from_this();
auto self = shared_from_this();
asyncWrite(cmd.const_asio_buffer(),
customAllocWriteHandler(
[weakSelf, cmd](const boost::system::error_code& err, size_t bytesTransferred) {
auto self = weakSelf.lock();
if (self) {
self->handleSend(err, cmd);
}
}));
customAllocWriteHandler([this, self, cmd](const boost::system::error_code& err,
size_t bytesTransferred) { handleSend(err, cmd); }));
}

void ClientConnection::sendMessage(const std::shared_ptr<SendArguments>& args) {
Expand All @@ -1102,23 +1103,15 @@ void ClientConnection::sendMessage(const std::shared_ptr<SendArguments>& args) {
pendingWriteBuffers_.emplace_back(args);
return;
}
auto weakSelf = weak_from_this();
auto sendMessageInternal = [this, weakSelf, args] {
auto self = weakSelf.lock();
if (!self) {
return;
}
auto self = shared_from_this();
auto sendMessageInternal = [this, self, args] {
BaseCommand outgoingCmd;
auto buffer = Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args);
// Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before the
// callback is called, an invalid buffer range might be passed to the underlying socket send.
asyncWrite(buffer, customAllocWriteHandler([weakSelf, buffer](const boost::system::error_code& err,
size_t bytesTransferred) {
auto self = weakSelf.lock();
if (self) {
self->handleSendPair(err);
}
}));
asyncWrite(buffer, customAllocWriteHandler(
[this, self, buffer](const boost::system::error_code& err,
size_t bytesTransferred) { handleSendPair(err); }));
};
if (tlsSocket_) {
#if BOOST_VERSION >= 106600
Expand All @@ -1132,6 +1125,9 @@ void ClientConnection::sendMessage(const std::shared_ptr<SendArguments>& args) {
}

void ClientConnection::handleSend(const boost::system::error_code& err, const SharedBuffer&) {
if (isClosed()) {
return;
}
if (err) {
LOG_WARN(cnxString_ << "Could not send message on connection: " << err << " " << err.message());
close(ResultDisconnected);
Expand All @@ -1141,6 +1137,9 @@ void ClientConnection::handleSend(const boost::system::error_code& err, const Sh
}

void ClientConnection::handleSendPair(const boost::system::error_code& err) {
if (isClosed()) {
return;
}
if (err) {
LOG_WARN(cnxString_ << "Could not send pair message on connection: " << err << " " << err.message());
close(ResultDisconnected);
Expand All @@ -1157,17 +1156,12 @@ void ClientConnection::sendPendingCommands() {
boost::any any = pendingWriteBuffers_.front();
pendingWriteBuffers_.pop_front();

auto weakSelf = weak_from_this();
auto self = shared_from_this();
if (any.type() == typeid(SharedBuffer)) {
SharedBuffer buffer = boost::any_cast<SharedBuffer>(any);
asyncWrite(
buffer.const_asio_buffer(),
customAllocWriteHandler([weakSelf, buffer](const boost::system::error_code& err, size_t) {
auto self = weakSelf.lock();
if (self) {
self->handleSend(err, buffer);
}
}));
asyncWrite(buffer.const_asio_buffer(),
customAllocWriteHandler([this, self, buffer](const boost::system::error_code& err,
size_t) { handleSend(err, buffer); }));
} else {
assert(any.type() == typeid(std::shared_ptr<SendArguments>));

Expand All @@ -1178,13 +1172,9 @@ void ClientConnection::sendPendingCommands() {

// Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before the
// callback is called, an invalid buffer range might be passed to the underlying socket send.
asyncWrite(buffer, customAllocWriteHandler(
[weakSelf, buffer](const boost::system::error_code& err, size_t) {
auto self = weakSelf.lock();
if (self) {
self->handleSendPair(err);
}
}));
asyncWrite(buffer,
customAllocWriteHandler([this, self, buffer](const boost::system::error_code& err,
size_t) { handleSendPair(err); }));
}
} else {
// No more pending writes
Expand Down Expand Up @@ -1334,10 +1324,11 @@ void ClientConnection::close(Result result, bool detach) {
}

lock.unlock();
int refCount = weak_from_this().use_count();
if (!isResultRetryable(result)) {
LOG_ERROR(cnxString_ << "Connection closed with " << result);
LOG_ERROR(cnxString_ << "Connection closed with " << result << " (refCnt: " << refCount << ")");
} else {
LOG_INFO(cnxString_ << "Connection disconnected");
LOG_INFO(cnxString_ << "Connection disconnected (refCnt: " << refCount << ")");
}
// Remove the connection from the pool before completing any promise
if (detach) {
Expand Down Expand Up @@ -1824,13 +1815,10 @@ void ClientConnection::handleAuthChallenge() {
close(result);
return;
}
auto weakSelf = weak_from_this();
auto self = shared_from_this();
asyncWrite(buffer.const_asio_buffer(),
customAllocWriteHandler([weakSelf, buffer](const boost::system::error_code& err, size_t) {
auto self = weakSelf.lock();
if (self) {
self->handleSentAuthResponse(err, buffer);
}
customAllocWriteHandler([this, self, buffer](const boost::system::error_code& err, size_t) {
handleSentAuthResponse(err, buffer);
}));
}

Expand Down
9 changes: 8 additions & 1 deletion run-unit-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,17 @@ docker compose -f tests/oauth2/docker-compose.yml down

# Run BrokerMetadata tests
docker compose -f tests/brokermetadata/docker-compose.yml up -d
sleep 15
until curl http://localhost:8080/metrics > /dev/null 2>&1 ; do sleep 1; done
sleep 5
$CMAKE_BUILD_DIRECTORY/tests/BrokerMetadataTest
docker compose -f tests/brokermetadata/docker-compose.yml down

docker compose -f tests/chunkdedup/docker-compose.yml up -d
until curl http://localhost:8080/metrics > /dev/null 2>&1 ; do sleep 1; done
sleep 5
$CMAKE_BUILD_DIRECTORY/tests/ChunkDedupTest --gtest_repeat=10
docker compose -f tests/chunkdedup/docker-compose.yml down

./pulsar-test-service-start.sh

pushd $CMAKE_BUILD_DIRECTORY/tests
Expand Down
3 changes: 3 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,6 @@ target_link_libraries(BrokerMetadataTest ${CLIENT_LIBS} pulsarStatic ${GTEST_LIB
add_executable(Oauth2Test oauth2/Oauth2Test.cc)
target_compile_options(Oauth2Test PRIVATE "-DTEST_ROOT_PATH=\"${CMAKE_CURRENT_SOURCE_DIR}\"")
target_link_libraries(Oauth2Test ${CLIENT_LIBS} pulsarStatic ${GTEST_LIBRARY_PATH})

add_executable(ChunkDedupTest chunkdedup/ChunkDedupTest.cc)
target_link_libraries(ChunkDedupTest ${CLIENT_LIBS} pulsarStatic ${GTEST_LIBRARY_PATH})
53 changes: 53 additions & 0 deletions tests/chunkdedup/ChunkDedupTest.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <gtest/gtest.h>
#include <pulsar/Client.h>

#include "lib/Latch.h"
#include "lib/LogUtils.h"

DECLARE_LOG_OBJECT()

using namespace pulsar;

// Before https://github.com/apache/pulsar/pull/20948, when message deduplication is enabled, sending chunks
// to the broker will receive send error response.
TEST(ChunkDedupTest, testSendChunks) {
Client client{"pulsar://localhost:6650"};
ProducerConfiguration conf;
conf.setBatchingEnabled(false);
conf.setChunkingEnabled(true);
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer("test-send-chunks", conf, producer));

Latch latch{1};
std::string value(1024000 /* max message size */ * 100, 'a');
producer.sendAsync(MessageBuilder().setContent(value).build(),
[&latch](Result result, const MessageId& msgId) {
LOG_INFO("Send to " << msgId << ": " << result);
latch.countdown();
});
ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
client.close();
}

int main(int argc, char* argv[]) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
44 changes: 44 additions & 0 deletions tests/chunkdedup/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

version: '3'
networks:
pulsar:
driver: bridge
services:
standalone:
image: apachepulsar/pulsar:3.1.0 # Ensure https://github.com/apache/pulsar/pull/20948 is not included
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to add a comment like "Don't change the pulsar version here.". This test could only work before 3.1.0, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

container_name: standalone
hostname: local
restart: "no"
networks:
- pulsar
environment:
- metadataStoreUrl=zk:localhost:2181
- clusterName=standalone
- advertisedAddress=localhost
- advertisedListeners=external:pulsar://localhost:6650
- PULSAR_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m
- PULSAR_PREFIX_maxMessageSize=1024000
- PULSAR_PREFIX_brokerDeduplicationEnabled=true
ports:
- "6650:6650"
- "8080:8080"
command: bash -c "bin/apply-config-from-env.py conf/standalone.conf && exec bin/pulsar standalone -nss -nfw"