From e0ae572e1b25222614515db89541e1786473dfa5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Korina=20=C5=A0imi=C4=8Devi=C4=87?= Date: Tue, 6 Feb 2024 13:40:05 +0100 Subject: [PATCH] Cancellation fixes Summary: related to T11798 - Cancel functions post outstanding handlers - Properly cancel autoconnect stream Reviewers: ivica Reviewed By: ivica Subscribers: miljen, iljazovic Differential Revision: https://repo.mireo.local/D27763 --- include/async_mqtt5/detail/async_mutex.hpp | 16 +++-- include/async_mqtt5/impl/async_sender.hpp | 10 +++- .../async_mqtt5/impl/autoconnect_stream.hpp | 3 +- include/async_mqtt5/impl/client_service.hpp | 1 + include/async_mqtt5/impl/reconnect_op.hpp | 4 +- include/async_mqtt5/impl/replies.hpp | 2 +- test/include/test_common/test_stream.hpp | 6 ++ test/integration/cancellation.cpp | 59 +++++-------------- test/integration/coroutine.cpp | 4 -- 9 files changed, 48 insertions(+), 57 deletions(-) diff --git a/include/async_mqtt5/detail/async_mutex.hpp b/include/async_mqtt5/detail/async_mutex.hpp index d43fcbd..ac050ef 100644 --- a/include/async_mqtt5/detail/async_mutex.hpp +++ b/include/async_mqtt5/detail/async_mutex.hpp @@ -160,10 +160,14 @@ class async_mutex { _waiting.pop_front(); if (!op) continue; op.get_cancellation_slot().clear(); - auto ex = asio::get_associated_executor(op, _ex); - asio::require(ex, asio::execution::blocking.possibly) - .execute([op = std::move(op)]() mutable { - std::move(op)(asio::error::operation_aborted); + asio::require(_ex, asio::execution::blocking.never) + .execute([ex = _ex, op = std::move(op)]() mutable { + auto opex = asio::get_associated_executor(op, ex); + opex.execute( + [op = std::move(op)]() mutable { + op(asio::error::operation_aborted); + } + ); }); } } @@ -178,7 +182,9 @@ class async_mutex { .execute([ex = _ex, op = std::move(op)]() mutable { auto opex = asio::get_associated_executor(op, ex); opex.execute( - [op = std::move(op)]() mutable { op(error_code{}); } + [op = std::move(op)]() mutable { + op(error_code {}); + } ); }); } diff --git a/include/async_mqtt5/impl/async_sender.hpp b/include/async_mqtt5/impl/async_sender.hpp index 0fad16e..23653b2 100644 --- a/include/async_mqtt5/impl/async_sender.hpp +++ b/include/async_mqtt5/impl/async_sender.hpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -46,6 +47,13 @@ class write_req { std::move(_handler)(ec); } + void complete_post(const asio::any_io_executor& ex, error_code ec) { + asio::post( + ex, + asio::prepend(std::move(_handler), ec) + ); + } + auto get_executor() { return asio::get_associated_executor(_handler); } @@ -140,7 +148,7 @@ class async_sender { void cancel() { auto ops = std::move(_write_queue); for (auto& op : ops) - op.complete(asio::error::operation_aborted); + op.complete_post(_svc.get_executor(), asio::error::operation_aborted); } void resend() { diff --git a/include/async_mqtt5/impl/autoconnect_stream.hpp b/include/async_mqtt5/impl/autoconnect_stream.hpp index 8dd2a0c..2356eb6 100644 --- a/include/async_mqtt5/impl/autoconnect_stream.hpp +++ b/include/async_mqtt5/impl/autoconnect_stream.hpp @@ -94,13 +94,14 @@ class autoconnect_stream { void cancel() { error_code ec; lowest_layer(*_stream_ptr).cancel(ec); + _conn_mtx.cancel(); + _connect_timer.cancel(); } void close() { error_code ec; shutdown(asio::ip::tcp::socket::shutdown_both); lowest_layer(*_stream_ptr).close(ec); - _connect_timer.cancel(); } void shutdown(asio::ip::tcp::socket::shutdown_type what) { diff --git a/include/async_mqtt5/impl/client_service.hpp b/include/async_mqtt5/impl/client_service.hpp index 41938ef..e2d75b3 100644 --- a/include/async_mqtt5/impl/client_service.hpp +++ b/include/async_mqtt5/impl/client_service.hpp @@ -362,6 +362,7 @@ class client_service { _rec_channel.close(); _replies.cancel_unanswered(); _async_sender.cancel(); + _stream.cancel(); _stream.close(); asio::get_associated_cancellation_slot(_run_handler).clear(); diff --git a/include/async_mqtt5/impl/reconnect_op.hpp b/include/async_mqtt5/impl/reconnect_op.hpp index df85211..44ad62a 100644 --- a/include/async_mqtt5/impl/reconnect_op.hpp +++ b/include/async_mqtt5/impl/reconnect_op.hpp @@ -181,7 +181,9 @@ class reconnect_op { private: void complete(error_code ec) { get_cancellation_slot().clear(); - _owner._conn_mtx.unlock(); + + if (ec != asio::error::operation_aborted) + _owner._conn_mtx.unlock(); std::move(_handler)(ec); } diff --git a/include/async_mqtt5/impl/replies.hpp b/include/async_mqtt5/impl/replies.hpp index 163ef6b..0905222 100644 --- a/include/async_mqtt5/impl/replies.hpp +++ b/include/async_mqtt5/impl/replies.hpp @@ -162,7 +162,7 @@ class replies { void cancel_unanswered() { auto ua = std::move(_handlers); for (auto& h : ua) - h.complete(asio::error::operation_aborted); + h.complete_post(_ex, asio::error::operation_aborted); } bool any_expired() { diff --git a/test/include/test_common/test_stream.hpp b/test/include/test_common/test_stream.hpp index adcd6f7..1072eab 100644 --- a/test/include/test_common/test_stream.hpp +++ b/test/include/test_common/test_stream.hpp @@ -52,6 +52,8 @@ class test_stream_impl { _test_broker = &asio::use_service(_ex.context()); } + void cancel(error_code&) {} + void close(error_code& ec) { disconnect(); ec = {}; @@ -248,6 +250,10 @@ class test_stream { _impl->open(p, ec); } + void cancel(error_code& ec) { + _impl->cancel(ec); + } + void close(error_code& ec) { _impl->close(ec); } diff --git a/test/integration/cancellation.cpp b/test/integration/cancellation.cpp index 2f245b7..b282294 100644 --- a/test/integration/cancellation.cpp +++ b/test/integration/cancellation.cpp @@ -28,8 +28,7 @@ enum operation_type { }; enum cancel_type { - ioc_stop = 1, - client_cancel, + client_cancel = 1, signal_emit }; @@ -49,7 +48,7 @@ void setup_cancel_op_test_case( asio::bind_cancellation_slot( signal.slot(), [&handlers_called](error_code ec) { - handlers_called++; + ++handlers_called; BOOST_TEST(ec == asio::error::operation_aborted); } ) @@ -68,7 +67,7 @@ void setup_cancel_op_test_case( asio::bind_cancellation_slot( signal.slot(), [&handlers_called](error_code ec) { - handlers_called++; + ++handlers_called; BOOST_TEST(ec == asio::error::operation_aborted); } ) @@ -89,10 +88,10 @@ void setup_cancel_op_test_case( [&handlers_called]( error_code ec, std::string t, std::string p, publish_props ) { - handlers_called++; - BOOST_TEST(ec == asio::error::operation_aborted); - BOOST_TEST(t == ""); - BOOST_TEST(p == ""); + ++handlers_called; + BOOST_TEST(ec == asio::error::operation_aborted); + BOOST_TEST(t == ""); + BOOST_TEST(p == ""); } ) ); @@ -113,7 +112,7 @@ void setup_cancel_op_test_case( [&handlers_called]( error_code ec, std::vector rcs, unsuback_props ) { - handlers_called++; + ++handlers_called; BOOST_TEST(ec == asio::error::operation_aborted); BOOST_TEST_REQUIRE(rcs.size() == 1u); BOOST_TEST(rcs[0] == reason_codes::empty); @@ -137,7 +136,7 @@ void setup_cancel_op_test_case( [&handlers_called]( error_code ec, std::vector rcs, suback_props ) { - handlers_called++; + ++handlers_called; BOOST_TEST(ec == asio::error::operation_aborted); BOOST_TEST_REQUIRE(rcs.size() == 1u); BOOST_TEST(rcs[0] == reason_codes::empty); @@ -150,39 +149,31 @@ template void run_cancel_op_test() { using namespace test; - constexpr int expected_handlers_called = c_type == ioc_stop ? 0 : 1; + constexpr int expected_handlers_called = 1; int handlers_called = 0; asio::io_context ioc; + asio::cancellation_signal signal; client_type c(ioc, ""); c.brokers("127.0.0.1"); - asio::cancellation_signal signal; setup_cancel_op_test_case(c, signal, handlers_called); asio::steady_timer timer(c.get_executor()); - timer.expires_after(std::chrono::milliseconds(10)); + timer.expires_after(std::chrono::milliseconds(100)); timer.async_wait([&](auto) { - if constexpr (c_type == ioc_stop) - ioc.stop(); - else if constexpr (c_type == client_cancel) + if constexpr (c_type == client_cancel) c.cancel(); else if constexpr (c_type == signal_emit) signal.emit(asio::cancellation_type_t::terminal); }); - - ioc.run_for(std::chrono::seconds(1)); - BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called); + ioc.run(); + BOOST_TEST(handlers_called == expected_handlers_called); } BOOST_AUTO_TEST_SUITE(cancellation/*, *boost::unit_test::disabled()*/) -// hangs -BOOST_AUTO_TEST_CASE(ioc_stop_async_run, *boost::unit_test::disabled()) { - run_cancel_op_test(); -} - BOOST_AUTO_TEST_CASE(client_cancel_async_run) { run_cancel_op_test(); } @@ -191,11 +182,6 @@ BOOST_AUTO_TEST_CASE(signal_emit_async_run) { run_cancel_op_test(); } -// hangs -BOOST_AUTO_TEST_CASE(ioc_stop_async_publish, *boost::unit_test::disabled()) { - run_cancel_op_test(); -} - BOOST_AUTO_TEST_CASE(client_cancel_async_publish) { run_cancel_op_test(); } @@ -204,11 +190,6 @@ BOOST_AUTO_TEST_CASE(signal_emit_async_publish) { run_cancel_op_test(); } -// hangs after ping changes -BOOST_AUTO_TEST_CASE(ioc_stop_async_receive, *boost::unit_test::disabled()) { - run_cancel_op_test(); -} - BOOST_AUTO_TEST_CASE(client_cancel_async_receive) { run_cancel_op_test(); } @@ -218,11 +199,6 @@ BOOST_AUTO_TEST_CASE(signal_emit_async_receive, *boost::unit_test::disabled()) { run_cancel_op_test(); } -// hangs -BOOST_AUTO_TEST_CASE(ioc_stop_async_subscribe, *boost::unit_test::disabled()) { - run_cancel_op_test(); -} - BOOST_AUTO_TEST_CASE(client_cancel_async_subscribe) { run_cancel_op_test(); } @@ -231,11 +207,6 @@ BOOST_AUTO_TEST_CASE(signal_emit_async_subscribe) { run_cancel_op_test(); } -// hangs -BOOST_AUTO_TEST_CASE(ioc_stop_async_unsubscribe, *boost::unit_test::disabled()) { - run_cancel_op_test(); -} - BOOST_AUTO_TEST_CASE(client_cancel_async_unsubscribe) { run_cancel_op_test(); } diff --git a/test/integration/coroutine.cpp b/test/integration/coroutine.cpp index 6fc0a71..e7cd79f 100644 --- a/test/integration/coroutine.cpp +++ b/test/integration/coroutine.cpp @@ -123,7 +123,6 @@ BOOST_AUTO_TEST_CASE(tcp_client_check) { [&](boost::system::error_code ec) { BOOST_TEST_WARN(ec, "Failed to receive all the expected replies!"); c.cancel(); - ioc.stop(); } ); @@ -159,7 +158,6 @@ BOOST_AUTO_TEST_CASE(websocket_tcp_client_check) { [&](boost::system::error_code ec) { BOOST_TEST_WARN(ec, "Failed to receive all the expected replies!"); c.cancel(); - ioc.stop(); } ); @@ -195,7 +193,6 @@ BOOST_AUTO_TEST_CASE(openssl_tls_client_check) { [&](boost::system::error_code ec) { BOOST_TEST_WARN(ec, "Failed to receive all the expected replies!"); c.cancel(); - ioc.stop(); } ); @@ -233,7 +230,6 @@ BOOST_AUTO_TEST_CASE(websocket_tls_client_check) { [&](boost::system::error_code ec) { BOOST_TEST_WARN(ec, "Failed to receive all the expected replies!"); c.cancel(); - ioc.stop(); } );