Skip to content

Commit

Permalink
Fix segmentation fault when sending messages after receiving an error
Browse files Browse the repository at this point in the history
Fixes apache#325

### Motivation

apache#317 introduces a bug
that might cause segmentation fault when sending messages after
receiving an error, see
apache#325 (comment)
for the detailed explanation.

### Modifications

When calling `asyncWrite`, capture the `shared_ptr` instead of the
`weak_ptr` to extend the lifetime of the `socket_` or `tlsSocket_` field
in `ClientConnection`. Since the lifetime is extended, in some
callbacks, check `isClosed()` before other logic.

Add a `ChunkDedupTest` to reproduce this issue based on Pulsar 3.1.0.
Run the test for 10 times to ensure it won't crash after this patch.
  • Loading branch information
BewareMyPower committed Oct 8, 2023
1 parent eea59bb commit 1beb597
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 67 deletions.
120 changes: 54 additions & 66 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -519,18 +519,18 @@ void ClientConnection::handleHandshake(const boost::system::error_code& err) {
return;
}
// Send CONNECT command to broker
auto weakSelf = weak_from_this();
auto self = shared_from_this();
asyncWrite(buffer.const_asio_buffer(),
customAllocWriteHandler([weakSelf, buffer](const boost::system::error_code& err, size_t) {
auto self = weakSelf.lock();
if (self) {
self->handleSentPulsarConnect(err, buffer);
}
customAllocWriteHandler([this, self, buffer](const boost::system::error_code& err, size_t) {
handleSentPulsarConnect(err, buffer);
}));
}

void ClientConnection::handleSentPulsarConnect(const boost::system::error_code& err,
const SharedBuffer& buffer) {
if (isClosed()) {
return;
}
if (err) {
LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
close();
Expand All @@ -543,6 +543,9 @@ void ClientConnection::handleSentPulsarConnect(const boost::system::error_code&

void ClientConnection::handleSentAuthResponse(const boost::system::error_code& err,
const SharedBuffer& buffer) {
if (isClosed()) {
return;
}
if (err) {
LOG_WARN(cnxString_ << "Failed to send auth response: " << err.message());
close();
Expand Down Expand Up @@ -637,11 +640,11 @@ void ClientConnection::handleResolve(const boost::system::error_code& err,

void ClientConnection::readNextCommand() {
const static uint32_t minReadSize = sizeof(uint32_t);
auto weakSelf = weak_from_this();
auto weak_self = weak_from_this();
asyncReceive(
incomingBuffer_.asio_buffer(),
customAllocReadHandler([weakSelf](const boost::system::error_code& err, size_t bytesTransferred) {
auto self = weakSelf.lock();
customAllocReadHandler([weak_self](const boost::system::error_code& err, size_t bytesTransferred) {
auto self = weak_self.lock();
if (self) {
self->handleRead(err, bytesTransferred, minReadSize);
}
Expand All @@ -650,6 +653,9 @@ void ClientConnection::readNextCommand() {

void ClientConnection::handleRead(const boost::system::error_code& err, size_t bytesTransferred,
uint32_t minReadSize) {
if (isClosed()) {
return;
}
// Update buffer write idx with new data
incomingBuffer_.bytesWritten(bytesTransferred);

Expand All @@ -666,12 +672,12 @@ void ClientConnection::handleRead(const boost::system::error_code& err, size_t b
// Read the remaining part, use a slice of buffer to write on the next
// region
SharedBuffer buffer = incomingBuffer_.slice(bytesTransferred);
auto weakSelf = weak_from_this();
auto weak_self = weak_from_this();
auto nextMinReadSize = minReadSize - bytesTransferred;
asyncReceive(buffer.asio_buffer(),
customAllocReadHandler([weakSelf, nextMinReadSize](const boost::system::error_code& err,
size_t bytesTransferred) {
auto self = weakSelf.lock();
customAllocReadHandler([weak_self, nextMinReadSize](const boost::system::error_code& err,
size_t bytesTransferred) {
auto self = weak_self.lock();
if (self) {
self->handleRead(err, bytesTransferred, nextMinReadSize);
}
Expand Down Expand Up @@ -701,12 +707,12 @@ void ClientConnection::processIncomingBuffer() {
uint32_t newBufferSize = std::max<uint32_t>(DefaultBufferSize, frameSize + sizeof(uint32_t));
incomingBuffer_ = SharedBuffer::copyFrom(incomingBuffer_, newBufferSize);
}
auto weakSelf = weak_from_this();
auto weak_self = weak_from_this();
asyncReceive(
incomingBuffer_.asio_buffer(),
customAllocReadHandler([weakSelf, bytesToReceive](const boost::system::error_code& err,
size_t bytesTransferred) {
auto self = weakSelf.lock();
customAllocReadHandler([weak_self, bytesToReceive](const boost::system::error_code& err,
size_t bytesTransferred) {
auto self = weak_self.lock();
if (self) {
self->handleRead(err, bytesTransferred, bytesToReceive);
}
Expand Down Expand Up @@ -787,11 +793,11 @@ void ClientConnection::processIncomingBuffer() {
// At least we need to read 4 bytes to have the complete frame size
uint32_t minReadSize = sizeof(uint32_t) - incomingBuffer_.readableBytes();

auto weakSelf = weak_from_this();
auto weak_self = weak_from_this();
asyncReceive(incomingBuffer_.asio_buffer(),
customAllocReadHandler([weakSelf, minReadSize](const boost::system::error_code& err,
size_t bytesTransferred) {
auto self = weakSelf.lock();
customAllocReadHandler([weak_self, minReadSize](const boost::system::error_code& err,
size_t bytesTransferred) {
auto self = weak_self.lock();
if (self) {
self->handleRead(err, bytesTransferred, minReadSize);
}
Expand Down Expand Up @@ -1085,15 +1091,10 @@ void ClientConnection::sendCommand(const SharedBuffer& cmd) {
}

void ClientConnection::sendCommandInternal(const SharedBuffer& cmd) {
auto weakSelf = weak_from_this();
auto self = shared_from_this();
asyncWrite(cmd.const_asio_buffer(),
customAllocWriteHandler(
[weakSelf, cmd](const boost::system::error_code& err, size_t bytesTransferred) {
auto self = weakSelf.lock();
if (self) {
self->handleSend(err, cmd);
}
}));
customAllocWriteHandler([this, self, cmd](const boost::system::error_code& err,
size_t bytesTransferred) { handleSend(err, cmd); }));
}

void ClientConnection::sendMessage(const std::shared_ptr<SendArguments>& args) {
Expand All @@ -1102,23 +1103,15 @@ void ClientConnection::sendMessage(const std::shared_ptr<SendArguments>& args) {
pendingWriteBuffers_.emplace_back(args);
return;
}
auto weakSelf = weak_from_this();
auto sendMessageInternal = [this, weakSelf, args] {
auto self = weakSelf.lock();
if (!self) {
return;
}
auto self = shared_from_this();
auto sendMessageInternal = [this, self, args] {
BaseCommand outgoingCmd;
auto buffer = Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args);
// Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before the
// callback is called, an invalid buffer range might be passed to the underlying socket send.
asyncWrite(buffer, customAllocWriteHandler([weakSelf, buffer](const boost::system::error_code& err,
size_t bytesTransferred) {
auto self = weakSelf.lock();
if (self) {
self->handleSendPair(err);
}
}));
asyncWrite(buffer, customAllocWriteHandler(
[this, self, buffer](const boost::system::error_code& err,
size_t bytesTransferred) { handleSendPair(err); }));
};
if (tlsSocket_) {
#if BOOST_VERSION >= 106600
Expand All @@ -1132,6 +1125,9 @@ void ClientConnection::sendMessage(const std::shared_ptr<SendArguments>& args) {
}

void ClientConnection::handleSend(const boost::system::error_code& err, const SharedBuffer&) {
if (isClosed()) {
return;
}
if (err) {
LOG_WARN(cnxString_ << "Could not send message on connection: " << err << " " << err.message());
close(ResultDisconnected);
Expand All @@ -1141,6 +1137,9 @@ void ClientConnection::handleSend(const boost::system::error_code& err, const Sh
}

void ClientConnection::handleSendPair(const boost::system::error_code& err) {
if (isClosed()) {
return;
}
if (err) {
LOG_WARN(cnxString_ << "Could not send pair message on connection: " << err << " " << err.message());
close(ResultDisconnected);
Expand All @@ -1157,17 +1156,12 @@ void ClientConnection::sendPendingCommands() {
boost::any any = pendingWriteBuffers_.front();
pendingWriteBuffers_.pop_front();

auto weakSelf = weak_from_this();
auto self = shared_from_this();
if (any.type() == typeid(SharedBuffer)) {
SharedBuffer buffer = boost::any_cast<SharedBuffer>(any);
asyncWrite(
buffer.const_asio_buffer(),
customAllocWriteHandler([weakSelf, buffer](const boost::system::error_code& err, size_t) {
auto self = weakSelf.lock();
if (self) {
self->handleSend(err, buffer);
}
}));
asyncWrite(buffer.const_asio_buffer(),
customAllocWriteHandler([this, self, buffer](const boost::system::error_code& err,
size_t) { handleSend(err, buffer); }));
} else {
assert(any.type() == typeid(std::shared_ptr<SendArguments>));

Expand All @@ -1178,13 +1172,9 @@ void ClientConnection::sendPendingCommands() {

// Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before the
// callback is called, an invalid buffer range might be passed to the underlying socket send.
asyncWrite(buffer, customAllocWriteHandler(
[weakSelf, buffer](const boost::system::error_code& err, size_t) {
auto self = weakSelf.lock();
if (self) {
self->handleSendPair(err);
}
}));
asyncWrite(buffer,
customAllocWriteHandler([this, self, buffer](const boost::system::error_code& err,
size_t) { handleSendPair(err); }));
}
} else {
// No more pending writes
Expand Down Expand Up @@ -1334,10 +1324,11 @@ void ClientConnection::close(Result result, bool detach) {
}

lock.unlock();
int refCount = weak_from_this().use_count();
if (!isResultRetryable(result)) {
LOG_ERROR(cnxString_ << "Connection closed with " << result);
LOG_ERROR(cnxString_ << "Connection closed with " << result << " (refCnt: " << refCount << ")");
} else {
LOG_INFO(cnxString_ << "Connection disconnected");
LOG_INFO(cnxString_ << "Connection disconnected (refCnt: " << refCount << ")");
}
// Remove the connection from the pool before completing any promise
if (detach) {
Expand Down Expand Up @@ -1824,13 +1815,10 @@ void ClientConnection::handleAuthChallenge() {
close(result);
return;
}
auto weakSelf = weak_from_this();
auto self = shared_from_this();
asyncWrite(buffer.const_asio_buffer(),
customAllocWriteHandler([weakSelf, buffer](const boost::system::error_code& err, size_t) {
auto self = weakSelf.lock();
if (self) {
self->handleSentAuthResponse(err, buffer);
}
customAllocWriteHandler([this, self, buffer](const boost::system::error_code& err, size_t) {
handleSentAuthResponse(err, buffer);
}));
}

Expand Down
9 changes: 8 additions & 1 deletion run-unit-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,17 @@ docker compose -f tests/oauth2/docker-compose.yml down

# Run BrokerMetadata tests
docker compose -f tests/brokermetadata/docker-compose.yml up -d
sleep 15
until curl http://localhost:8080/metrics > /dev/null 2>&1 ; do sleep 1; done
sleep 5
$CMAKE_BUILD_DIRECTORY/tests/BrokerMetadataTest
docker compose -f tests/brokermetadata/docker-compose.yml down

docker compose -f tests/chunkdedup/docker-compose.yml up -d
until curl http://localhost:8080/metrics > /dev/null 2>&1 ; do sleep 1; done
sleep 5
$CMAKE_BUILD_DIRECTORY/tests/ChunkDedupTest --gtest_repeat=10
docker compose -f tests/chunkdedup/docker-compose.yml down

./pulsar-test-service-start.sh

pushd $CMAKE_BUILD_DIRECTORY/tests
Expand Down
3 changes: 3 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,6 @@ target_link_libraries(BrokerMetadataTest ${CLIENT_LIBS} pulsarStatic ${GTEST_LIB
add_executable(Oauth2Test oauth2/Oauth2Test.cc)
target_compile_options(Oauth2Test PRIVATE "-DTEST_ROOT_PATH=\"${CMAKE_CURRENT_SOURCE_DIR}\"")
target_link_libraries(Oauth2Test ${CLIENT_LIBS} pulsarStatic ${GTEST_LIBRARY_PATH})

add_executable(ChunkDedupTest chunkdedup/ChunkDedupTest.cc)
target_link_libraries(ChunkDedupTest ${CLIENT_LIBS} pulsarStatic ${GTEST_LIBRARY_PATH})
53 changes: 53 additions & 0 deletions tests/chunkdedup/ChunkDedupTest.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <gtest/gtest.h>
#include <pulsar/Client.h>

#include "lib/Latch.h"
#include "lib/LogUtils.h"

DECLARE_LOG_OBJECT()

using namespace pulsar;

// Before https://github.com/apache/pulsar/pull/20948, when message deduplication is enabled, sending chunks
// to the broker will receive send error response.
TEST(ChunkDedupTest, testSendChunks) {
Client client{"pulsar://localhost:6650"};
ProducerConfiguration conf;
conf.setBatchingEnabled(false);
conf.setChunkingEnabled(true);
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer("test-send-chunks", conf, producer));

Latch latch{1};
std::string value(1024000 /* max message size */ * 100, 'a');
producer.sendAsync(MessageBuilder().setContent(value).build(),
[&latch](Result result, const MessageId& msgId) {
LOG_INFO("Send to " << msgId << ": " << result);
latch.countdown();
});
ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
client.close();
}

int main(int argc, char* argv[]) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
44 changes: 44 additions & 0 deletions tests/chunkdedup/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

version: '3'
networks:
pulsar:
driver: bridge
services:
standalone:
image: apachepulsar/pulsar:3.1.0 # Ensure https://github.com/apache/pulsar/pull/20948 is not included
container_name: standalone
hostname: local
restart: "no"
networks:
- pulsar
environment:
- metadataStoreUrl=zk:localhost:2181
- clusterName=standalone
- advertisedAddress=localhost
- advertisedListeners=external:pulsar://localhost:6650
- PULSAR_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m
- PULSAR_PREFIX_maxMessageSize=1024000
- PULSAR_PREFIX_brokerDeduplicationEnabled=true
ports:
- "6650:6650"
- "8080:8080"
command: bash -c "bin/apply-config-from-env.py conf/standalone.conf && exec bin/pulsar standalone -nss -nfw"

0 comments on commit 1beb597

Please sign in to comment.