Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Segmentation fault when sending messages after receiving an error #325

Closed
2 tasks done
BewareMyPower opened this issue Oct 8, 2023 · 1 comment · Fixed by #326
Closed
2 tasks done

[Bug] Segmentation fault when sending messages after receiving an error #325

BewareMyPower opened this issue Oct 8, 2023 · 1 comment · Fixed by #326
Assignees

Comments

@BewareMyPower
Copy link
Contributor

BewareMyPower commented Oct 8, 2023

Search before asking

  • I searched in the issues and found nothing similar.

Version

Additional broker configs:

brokerDeduplicationEnabled=true
maxMessageSize=1024000

Minimal reproduce step

To simulate the send error is returned, we need to run the following code on a Pulsar release that does not include apache/pulsar#20948

#include <pulsar/Client.h>

#include <chrono>
#include <iostream>
#include <thread>

using namespace pulsar;

int main() {
    Client client("pulsar://127.0.0.1:6650");

    Producer producer;
    ProducerConfiguration conf;
    conf.setBatchingEnabled(false);
    conf.setChunkingEnabled(true);
    client.createProducer("persistent://public/default/my-topic", conf, producer);

    producer.sendAsync(MessageBuilder().setContent(std::string(1024000 * 5, 'a')).build(),
                       [](Result result, const MessageId& msgId) {
                           std::cout << "XYZ send " << result << " " << msgId << std::endl;
                       });
    std::this_thread::sleep_for(std::chrono::seconds(1));

    client.close();
}

What did you expect to see?

The application exits normally.

What did you see instead?

2023-10-08 11:41:38.699 WARN  [0x16f05b000] ClientConnection:1529 | [127.0.0.1:55532 -> 127.0.0.1:6650] Received send error from server: Cannot determine whether the message is a duplicate at this time
2023-10-08 11:41:38.699 INFO  [0x16f05b000] ClientConnection:1340 | [127.0.0.1:55532 -> 127.0.0.1:6650] Connection disconnected
2023-10-08 11:41:38.699 INFO  [0x16f05b000] HandlerBase:141 | [persistent://public/default/test-send-chunks, standalone-12-4] Schedule reconnection in 0.1 s
2023-10-08 11:41:38.699 INFO  [0x16f05b000] ConnectionPool:122 | Remove connection for pulsar://127.0.0.1:6650
2023-10-08 11:41:38.699 INFO  [0x16f05b000] ClientConnection:268 | [127.0.0.1:55532 -> 127.0.0.1:6650] Destroyed connection to pulsar://127.0.0.1:6650

You can also see the failure reproduced in this workflow:

[----------] 1 test from ChunkDedupTest
[ RUN      ] ChunkDedupTest.testSendChunks
2023-10-08 03:32:50.574 INFO  [139665533579648] ClientConnection:188 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
2023-10-08 03:32:50.574 INFO  [139665533579648] ConnectionPool:107 | Created connection for pulsar://localhost:6650
2023-10-08 03:32:50.578 INFO  [139665518995008] ClientConnection:398 | [[::1]:43874 -> [::1]:6650] Connected to broker
2023-10-08 03:32:50.655 INFO  [139665518995008] HandlerBase:81 | [persistent://public/default/test-send-chunks, ] Getting connection from pool
2023-10-08 03:32:51.124 INFO  [1396[655](https://github.com/BewareMyPower/pulsar-client-cpp/actions/runs/6445210808/job/17499021562?pr=22#step:8:656)18995008] ProducerImpl:199 | [persistent://public/default/test-send-chunks, ] Created producer on broker [[::1]:43874 -> [::1]:6650] 
2023-10-08 03:32:51.293 WARN  [139665518995008] ClientConnection:1528 | [[::1]:43874 -> [::1]:6650] Received send error from server: Cannot determine whether the message is a duplicate at this time
2023-10-08 03:32:51.294 INFO  [139665518995008] ClientConnection:1340 | [[::1]:43874 -> [::1]:6650] Connection disconnected
2023-10-08 03:32:51.294 INFO  [139665518995008] ConnectionPool:122 | Remove connection for pulsar://localhost:6650
2023-10-08 03:32:51.294 INFO  [1396[655](https://github.com/BewareMyPower/pulsar-client-cpp/actions/runs/6445210808/job/17499021562?pr=22#step:8:656)18995008] HandlerBase:141 | [persistent://public/default/test-send-chunks, standalone-0-0] Schedule reconnection in 0.1 s
2023-10-08 03:32:51.295 INFO  [139665518995008] ClientConnection:268 | [[::1]:43874 -> [::1]:6650] Destroyed connection to pulsar://localhost:6650
./run-unit-tests.sh: line 51:  6925 Segmentation fault      (core dumped) $CMAKE_BUILD_DIRECTORY/tests/ChunkDedupTest --gtest_repeat=10
Error: Process completed with exit code 139.

Anything else?

The root cause might be #317. I tried reverting that commit in my local env and it never crashed.

Here is also a similar crash report: #324

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@BewareMyPower BewareMyPower self-assigned this Oct 8, 2023
@BewareMyPower
Copy link
Contributor Author

Here are the LLDB debug result:

Process 15050 stopped
* thread #2, stop reason = EXC_BAD_ACCESS (code=1, address=0xbeaddccd09f0)
    frame #0: 0x0000000102ccfa2c libpulsar.dylib`boost::asio::detail::reactor_op::reactor_op(this=0x00000001010110b0, success_ec=0x0000beaddccd09f0, perform_func=(libpulsar.dylib`boost::asio::detail::reactive_socket_send_op_base<boost::asio::detail::prepared_buffers<boost::asio::const_buffer, 64ul> >::do_perform(boost::asio::detail::reactor_op*) at reactive_socket_send_op.hpp:53), complete_func=(libpulsar.dylib`boost::asio::detail::reactive_socket_send_op<boost::asio::detail::prepared_buffers<boost::asio::const_buffer, 64ul>, boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >, boost::asio::any_io_executor>::do_complete(void*, boost::asio::detail::scheduler_operation*, boost::system::error_code const&, unsigned long) at reactive_socket_send_op.hpp:124))(boost::asio::detail::reactor_op*), void (*)(void*, boost::asio::detail::scheduler_operation*, boost::system::error_code const&, unsigned long)) at reactor_op.hpp:56:7
   53  	  reactor_op(const boost::system::error_code& success_ec,
   54  	      perform_func_type perform_func, func_type complete_func)
   55  	    : operation(complete_func),
-> 56  	      ec_(success_ec),
   57  	      cancellation_key_(0),
   58  	      bytes_transferred_(0),
   59  	      perform_func_(perform_func)
Target 0: (ChunkDedupTest) stopped.
(lldb) bt
* thread #2, stop reason = EXC_BAD_ACCESS (code=1, address=0xbeaddccd09f0)
  * frame #0: 0x0000000102ccfa2c libpulsar.dylib`boost::asio::detail::reactor_op::reactor_op(this=0x00000001010110b0, success_ec=0x0000beaddccd09f0, perform_func=(libpulsar.dylib`boost::asio::detail::reactive_socket_send_op_base<boost::asio::detail::prepared_buffers<boost::asio::const_buffer, 64ul> >::do_perform(boost::asio::detail::reactor_op*) at reactive_socket_send_op.hpp:53), complete_func=(libpulsar.dylib`boost::asio::detail::reactive_socket_send_op<boost::asio::detail::prepared_buffers<boost::asio::const_buffer, 64ul>, boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >, boost::asio::any_io_executor>::do_complete(void*, boost::asio::detail::scheduler_operation*, boost::system::error_code const&, unsigned long) at reactive_socket_send_op.hpp:124))(boost::asio::detail::reactor_op*), void (*)(void*, boost::asio::detail::scheduler_operation*, boost::system::error_code const&, unsigned long)) at reactor_op.hpp:56:7
    frame #1: 0x0000000102cf042c libpulsar.dylib`boost::asio::detail::reactive_socket_send_op_base<boost::asio::detail::prepared_buffers<boost::asio::const_buffer, 64ul> >::reactive_socket_send_op_base(this=0x00000001010110b0, success_ec=0x0000beaddccd09f0, socket=29, state='\0', buffers=0x000000016fe864a8, flags=0, complete_func=(libpulsar.dylib`boost::asio::detail::reactive_socket_send_op<boost::asio::detail::prepared_buffers<boost::asio::const_buffer, 64ul>, boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >, boost::asio::any_io_executor>::do_complete(void*, boost::asio::detail::scheduler_operation*, boost::system::error_code const&, unsigned long) at reactive_socket_send_op.hpp:124))(void*, boost::asio::detail::scheduler_operation*, boost::system::error_code const&, unsigned long)) at reactive_socket_send_op.hpp:43:7
    frame #2: 0x0000000102d5245c libpulsar.dylib`boost::asio::detail::reactive_socket_send_op<boost::asio::detail::prepared_buffers<boost::asio::const_buffer, 64ul>, boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >, boost::asio::any_io_executor>::reactive_socket_send_op(this=0x00000001010110b0, success_ec=0x0000beaddccd09f0, socket=29, state='\0', buffers=0x000000016fe864a8, flags=0, handler=0x000000016fe866f8, io_ex=0x00006000026009e0)::$_20> >&, boost::asio::any_io_executor const&) at reactive_socket_send_op.hpp:114:7
    frame #3: 0x0000000102d52058 libpulsar.dylib`boost::asio::detail::reactive_socket_send_op<boost::asio::detail::prepared_buffers<boost::asio::const_buffer, 64ul>, boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >, boost::asio::any_io_executor>::reactive_socket_send_op(this=0x00000001010110b0, success_ec=0x0000beaddccd09f0, socket=29, state='\0', buffers=0x000000016fe864a8, flags=0, handler=0x000000016fe866f8, io_ex=0x00006000026009e0)::$_20> >&, boost::asio::any_io_executor const&) at reactive_socket_send_op.hpp:118:3
    frame #4: 0x0000000102d51e00 libpulsar.dylib`void boost::asio::detail::reactive_socket_service_base::async_send<boost::asio::detail::prepared_buffers<boost::asio::const_buffer, 64ul>, boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >, boost::asio::any_io_executor>(this=0x0000beaddccd09e8, impl=0x00006000026009c8, buffers=0x000000016fe864a8, flags=0, handler=0x000000016fe866f8, io_ex=0x00006000026009e0)::$_20> >&, boost::asio::any_io_executor const&) at reactive_socket_service_base.hpp:301:21
    frame #5: 0x0000000102d51d28 libpulsar.dylib`void boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>::initiate_async_send::operator(this=0x000000016fe86420, handler=0x000000016fe866f8, buffers=0x000000016fe864a8, flags=0)<boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >, boost::asio::detail::prepared_buffers<boost::asio::const_buffer, 64ul> >(boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >&&, boost::asio::detail::prepared_buffers<boost::asio::const_buffer, 64ul> const&, int) const at basic_stream_socket.hpp:1142:34
    frame #6: 0x0000000102d51c88 libpulsar.dylib`void boost::asio::detail::completion_handler_async_result<boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >, void (boost::system::error_code, unsigned long)>::initiate<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>::initiate_async_send, boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands(initiation=0x000000016fe86420, token=0x000000016fe866f8, args=0x000000016fe864a8, args=0x000000016fe8641c)::$_20> >, boost::asio::detail::prepared_buffers<boost::asio::const_buffer, 64ul> const&, int>(boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>::initiate_async_send&&, boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >&&, boost::asio::detail::prepared_buffers<boost::asio::const_buffer, 64ul> const&, int&&) at async_result.hpp:482:5
    frame #7: 0x0000000102d51c48 libpulsar.dylib`boost::asio::constraint<detail::async_result_has_initiate_memfn<boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >, void (boost::system::error_code, unsigned long)>::value, decltype(async_result<std::__1::decay<boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> > >::type, void (boost::system::error_code, unsigned long)>::initiate(declval<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>::initiate_async_send&&>(), declval<boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >&&>(), declval<boost::asio::detail::prepared_buffers<boost::asio::const_buffer, 64ul> const&>(), declval<int&&>()))>::type boost::asio::async_initiate<boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands(initiation=0x000000016fe86420, token=0x000000016fe866f8, args=0x000000016fe864a8, args=0x000000016fe8641c)::$_20> >, void (boost::system::error_code, unsigned long), boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>::initiate_async_send, boost::asio::detail::prepared_buffers<boost::asio::const_buffer, 64ul> const&, int>(boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>::initiate_async_send&&, boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >&, boost::asio::detail::prepared_buffers<boost::asio::const_buffer, 64ul> const&, int&&) at async_result.hpp:895:10
    frame #8: 0x0000000102d51bf8 libpulsar.dylib`auto boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>::async_write_some<boost::asio::detail::prepared_buffers<boost::asio::const_buffer, 64ul>, boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands(this=0x00006000026009c0, buffers=0x000000016fe864a8, token=0x000000016fe866f8)::$_20> > >(boost::asio::detail::prepared_buffers<boost::asio::const_buffer, 64ul> const&, boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >&&) at basic_stream_socket.hpp:970:12
    frame #9: 0x0000000102d519c8 libpulsar.dylib`boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >::operator(this=0x000000016fe866f8, ec=error_code @ 0x000000016fe865f0, bytes_transferred=65536, start=0)(boost::system::error_code, unsigned long, int) at write.hpp:345:21
    frame #10: 0x0000000102d52c1c libpulsar.dylib`boost::asio::detail::binder2<boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >, boost::system::error_code, unsigned long>::operator(this=0x000000016fe866f8)() at bind_handler.hpp:289:5
    frame #11: 0x0000000102d52bd8 libpulsar.dylib`void boost::asio::asio_handler_invoke<boost::asio::detail::binder2<boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >, boost::system::error_code, unsigned long> >(function=0x000000016fe866f8)::$_20> >, boost::system::error_code, unsigned long>&, ...) at handler_invoke_hook.hpp:88:3
    frame #12: 0x0000000102d52bb4 libpulsar.dylib`void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder2<boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >, boost::system::error_code, unsigned long>, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >(function=0x000000016fe866f8, context=0x000000016fe86798)::$_20> >, boost::system::error_code, unsigned long>&, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20>&) at handler_invoke_helpers.hpp:54:3
    frame #13: 0x0000000102d52b80 libpulsar.dylib`void boost::asio::detail::asio_handler_invoke<boost::asio::detail::binder2<boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >, boost::system::error_code, unsigned long>, boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >(function=0x000000016fe866f8, this_handler=0x000000016fe866f8)::$_20> >, boost::system::error_code, unsigned long>&, boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >*) at write.hpp:430:5
    frame #14: 0x0000000102d52b18 libpulsar.dylib`void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder2<boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >, boost::system::error_code, unsigned long>, boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> > >(function=0x000000016fe866f8, context=0x000000016fe866f8)::$_20> >, boost::system::error_code, unsigned long>&, boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >&) at handler_invoke_helpers.hpp:54:3
    frame #15: 0x0000000102d52864 libpulsar.dylib`void boost::asio::detail::handler_work<boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >, boost::asio::any_io_executor, void>::complete<boost::asio::detail::binder2<boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands(this=0x000000016fe86880, function=0x000000016fe866f8, handler=0x000000016fe866f8)::$_20> >, boost::system::error_code, unsigned long> >(boost::asio::detail::binder2<boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >, boost::system::error_code, unsigned long>&, boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >&) at handler_work.hpp:524:7
    frame #16: 0x0000000102d525cc libpulsar.dylib`boost::asio::detail::reactive_socket_send_op<boost::asio::detail::prepared_buffers<boost::asio::const_buffer, 64ul>, boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::any_io_executor>, pulsar::CompositeSharedBuffer<2>, boost::asio::const_buffer const*, boost::asio::detail::transfer_all_t, AllocHandler<pulsar::ClientConnection::sendPendingCommands()::$_20> >, boost::asio::any_io_executor>::do_complete(owner=0x0000000100f05460, base=0x00000001010110b0, (null)=0x000000016fe86dd0, (null)=0) at reactive_socket_send_op.hpp:155:9
    frame #17: 0x0000000102ccdfd4 libpulsar.dylib`boost::asio::detail::scheduler_operation::complete(this=0x00000001010110b0, owner=0x0000000100f05460, ec=0x000000016fe86dd0, bytes_transferred=0) at scheduler_operation.hpp:40:5
    frame #18: 0x0000000102cdec40 libpulsar.dylib`boost::asio::detail::scheduler::do_run_one(this=0x0000000100f05460, lock=0x000000016fe869e0, this_thread=0x000000016fe86a18, ec=0x000000016fe86dd0) at scheduler.ipp:493:12
    frame #19: 0x0000000102cde8ec libpulsar.dylib`boost::asio::detail::scheduler::run(this=0x0000000100f05460, ec=0x000000016fe86dd0) at scheduler.ipp:210:10
    frame #20: 0x0000000102ebd804 libpulsar.dylib`boost::asio::io_context::run(this=0x0000600003700268, ec=0x000000016fe86dd0) at io_context.ipp:72:16
    frame #21: 0x0000000102ebd3d4 libpulsar.dylib`pulsar::ExecutorService::start(this=0x00006000002139e8)::$_0::operator()() const at ExecutorService.cc:39:25
    frame #22: 0x0000000102ebd26c libpulsar.dylib`decltype(__f=0x00006000002139e8)::$_0>()()) std::__1::__invoke[abi:v15006]<pulsar::ExecutorService::start()::$_0>(pulsar::ExecutorService::start()::$_0&&) at invoke.h:394:23
    frame #23: 0x0000000102ebd248 libpulsar.dylib`void std::__1::__thread_execute[abi:v15006]<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, pulsar::ExecutorService::start()::$_0>(__t=size=2, (null)=__tuple_indices<> @ 0x000000016fe86f7f)::$_0>&, std::__1::__tuple_indices<>) at thread:290:5
    frame #24: 0x0000000102ebcecc libpulsar.dylib`void* std::__1::__thread_proxy[abi:v15006]<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, pulsar::ExecutorService::start()::$_0> >(__vp=0x00006000002139e0) at thread:301:5
    frame #25: 0x000000019c70bfa8 libsystem_pthread.dylib`_pthread_start + 148

The underlying implementation of boost::asio::async_write(socket, ...) is:

  1. Register a write_op object that wraps the buffer to socket. See the following code from boost/asio/impl/write.hpp of Boost 1.82.
465   template <typename AsyncWriteStream>
466   class initiate_async_write
467   {
468   public:
481     template <typename WriteHandler, typename ConstBufferSequence,
482         typename CompletionCondition>
483     void operator()(BOOST_ASIO_MOVE_ARG(WriteHandler) handler,
484         const ConstBufferSequence& buffers,
485         BOOST_ASIO_MOVE_ARG(CompletionCondition) completion_cond) const
486     {
/* ... */
493       start_write_op(stream_, buffers,  // [2], stream_ is the socket
494           boost::asio::buffer_sequence_begin(buffers),
495           completion_cond2.value, handler2.value);
496     }
542     void (boost::system::error_code, std::size_t))
543 async_write(AsyncWriteStream& s, const ConstBufferSequence& buffers,
/* ... */
555 {
556   return async_initiate<WriteToken,
557     void (boost::system::error_code, std::size_t)>(
558       detail::initiate_async_write<AsyncWriteStream>(s),  // [1]
559       token, buffers,
560       BOOST_ASIO_MOVE_CAST(CompletionCondition)(completion_condition));
561 }
  1. Call socket.async_write_some in a loop until the whole buffer is sent successfully. See the frame 9 in the stacks above. (So It cannot be reproduced with small messages because async_write_some could once send all bytes.)
// frame 9, boost/asio/impl/write.hpp (Boost 1.82.0)
   342 	        {
   343 	          {
   344 	            BOOST_ASIO_HANDLER_LOCATION((__FILE__, __LINE__, "async_write"));
-> 345 	            stream_.async_write_some(buffers_.prepare(max_size),
   346 	                BOOST_ASIO_MOVE_CAST(write_op)(*this));
   347 	          }
   348 	          return; default:

As we can see, the application crashed at async_write_some after the ClientConnection object is destructed (see the Destroyed connection to pulsar://127.0.0.1:6650 log before the crash) After closing the socket, the write_op will break the loop with operation_aborted and then trigger a callback in the event loop.

// boost/asio/impl/write.hpp
355           if (this->cancelled() != cancellation_type::none)
356           {
357             ec = error::operation_aborted;
358             break;
359           }

In ClientConnection::close, the ClientConnection object will be removed from the pool via pool_.remove. Since only the ConnectionPool owns the ClientConnection, the ClientConnection object, as well as the socket_ field, will be destroyed after it's removed from the pool.

However, the write_op object is registered with the socket (stream_ here) object, the next time when Asio event loop executes the callback, a destroyed socket address will be accessed. Before #317, the ClientConnection is owned by the callback rather than the pool. The ClientConnection object will be passed into the next callback. Only after the last callback that owns ClientConnection is completed will it be destroyed.

@BewareMyPower BewareMyPower changed the title [Bug] Segmentation fault when sending messages after receiving send error [Bug] Segmentation fault when sending messages after receiving an error Oct 8, 2023
BewareMyPower added a commit to BewareMyPower/pulsar-client-cpp that referenced this issue Oct 8, 2023
Fixes apache#325

### Motivation

apache#317 introduces a bug
that might cause segmentation fault when sending messages after
receiving an error, see
apache#325 (comment)
for the detailed explanation.

### Modifications

When calling `asyncWrite`, capture the `shared_ptr` instead of the
`weak_ptr` to extend the lifetime of the `socket_` or `tlsSocket_` field
in `ClientConnection`. Since the lifetime is extended, in some
callbacks, check `isClosed()` before other logic.

Add a `ChunkDedupTest` to reproduce this issue based on Pulsar 3.1.0.
Run the test for 10 times to ensure it won't crash after this patch.
BewareMyPower added a commit to streamnative/pulsar-client-cpp that referenced this issue Oct 8, 2023
…apache#326)

Fixes apache#325

### Motivation

apache#317 introduces a bug
that might cause segmentation fault when sending messages after
receiving an error, see
apache#325 (comment)
for the detailed explanation.

### Modifications

When calling `asyncWrite`, capture the `shared_ptr` instead of the
`weak_ptr` to extend the lifetime of the `socket_` or `tlsSocket_` field
in `ClientConnection`. Since the lifetime is extended, in some
callbacks, check `isClosed()` before other logic.

Add a `ChunkDedupTest` to reproduce this issue based on Pulsar 3.1.0.
Run the test for 10 times to ensure it won't crash after this patch.
BewareMyPower added a commit that referenced this issue Oct 9, 2023
…#326)

Fixes #325

### Motivation

#317 introduces a bug
that might cause segmentation fault when sending messages after
receiving an error, see
#325 (comment)
for the detailed explanation.

### Modifications

When calling `asyncWrite`, capture the `shared_ptr` instead of the
`weak_ptr` to extend the lifetime of the `socket_` or `tlsSocket_` field
in `ClientConnection`. Since the lifetime is extended, in some
callbacks, check `isClosed()` before other logic.

Add a `ChunkDedupTest` to reproduce this issue based on Pulsar 3.1.0.
Run the test for 10 times to ensure it won't crash after this patch.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
1 participant