Skip to content

Commit

Permalink
Cancellation fixes
Browse files Browse the repository at this point in the history
Summary:
related to T11798
- Cancel functions post outstanding handlers
- Properly cancel autoconnect stream

Reviewers: ivica

Reviewed By: ivica

Subscribers: miljen, iljazovic

Differential Revision: https://repo.mireo.local/D27763
  • Loading branch information
ksimicevic committed Feb 6, 2024
1 parent 0de02e3 commit e0ae572
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 57 deletions.
16 changes: 11 additions & 5 deletions include/async_mqtt5/detail/async_mutex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,14 @@ class async_mutex {
_waiting.pop_front();
if (!op) continue;
op.get_cancellation_slot().clear();
auto ex = asio::get_associated_executor(op, _ex);
asio::require(ex, asio::execution::blocking.possibly)
.execute([op = std::move(op)]() mutable {
std::move(op)(asio::error::operation_aborted);
asio::require(_ex, asio::execution::blocking.never)
.execute([ex = _ex, op = std::move(op)]() mutable {
auto opex = asio::get_associated_executor(op, ex);
opex.execute(
[op = std::move(op)]() mutable {
op(asio::error::operation_aborted);
}
);
});
}
}
Expand All @@ -178,7 +182,9 @@ class async_mutex {
.execute([ex = _ex, op = std::move(op)]() mutable {
auto opex = asio::get_associated_executor(op, ex);
opex.execute(
[op = std::move(op)]() mutable { op(error_code{}); }
[op = std::move(op)]() mutable {
op(error_code {});
}
);
});
}
Expand Down
10 changes: 9 additions & 1 deletion include/async_mqtt5/impl/async_sender.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <boost/asio/bind_allocator.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/prepend.hpp>
#include <boost/asio/ip/tcp.hpp>

Expand Down Expand Up @@ -46,6 +47,13 @@ class write_req {
std::move(_handler)(ec);
}

void complete_post(const asio::any_io_executor& ex, error_code ec) {
asio::post(
ex,
asio::prepend(std::move(_handler), ec)
);
}

auto get_executor() {
return asio::get_associated_executor(_handler);
}
Expand Down Expand Up @@ -140,7 +148,7 @@ class async_sender {
void cancel() {
auto ops = std::move(_write_queue);
for (auto& op : ops)
op.complete(asio::error::operation_aborted);
op.complete_post(_svc.get_executor(), asio::error::operation_aborted);
}

void resend() {
Expand Down
3 changes: 2 additions & 1 deletion include/async_mqtt5/impl/autoconnect_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,14 @@ class autoconnect_stream {
void cancel() {
error_code ec;
lowest_layer(*_stream_ptr).cancel(ec);
_conn_mtx.cancel();
_connect_timer.cancel();
}

void close() {
error_code ec;
shutdown(asio::ip::tcp::socket::shutdown_both);
lowest_layer(*_stream_ptr).close(ec);
_connect_timer.cancel();
}

void shutdown(asio::ip::tcp::socket::shutdown_type what) {
Expand Down
1 change: 1 addition & 0 deletions include/async_mqtt5/impl/client_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ class client_service {
_rec_channel.close();
_replies.cancel_unanswered();
_async_sender.cancel();
_stream.cancel();
_stream.close();

asio::get_associated_cancellation_slot(_run_handler).clear();
Expand Down
4 changes: 3 additions & 1 deletion include/async_mqtt5/impl/reconnect_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ class reconnect_op {
private:
void complete(error_code ec) {
get_cancellation_slot().clear();
_owner._conn_mtx.unlock();

if (ec != asio::error::operation_aborted)
_owner._conn_mtx.unlock();

std::move(_handler)(ec);
}
Expand Down
2 changes: 1 addition & 1 deletion include/async_mqtt5/impl/replies.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class replies {
void cancel_unanswered() {
auto ua = std::move(_handlers);
for (auto& h : ua)
h.complete(asio::error::operation_aborted);
h.complete_post(_ex, asio::error::operation_aborted);
}

bool any_expired() {
Expand Down
6 changes: 6 additions & 0 deletions test/include/test_common/test_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class test_stream_impl {
_test_broker = &asio::use_service<test_broker>(_ex.context());
}

void cancel(error_code&) {}

void close(error_code& ec) {
disconnect();
ec = {};
Expand Down Expand Up @@ -248,6 +250,10 @@ class test_stream {
_impl->open(p, ec);
}

void cancel(error_code& ec) {
_impl->cancel(ec);
}

void close(error_code& ec) {
_impl->close(ec);
}
Expand Down
59 changes: 15 additions & 44 deletions test/integration/cancellation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ enum operation_type {
};

enum cancel_type {
ioc_stop = 1,
client_cancel,
client_cancel = 1,
signal_emit
};

Expand All @@ -49,7 +48,7 @@ void setup_cancel_op_test_case(
asio::bind_cancellation_slot(
signal.slot(),
[&handlers_called](error_code ec) {
handlers_called++;
++handlers_called;
BOOST_TEST(ec == asio::error::operation_aborted);
}
)
Expand All @@ -68,7 +67,7 @@ void setup_cancel_op_test_case(
asio::bind_cancellation_slot(
signal.slot(),
[&handlers_called](error_code ec) {
handlers_called++;
++handlers_called;
BOOST_TEST(ec == asio::error::operation_aborted);
}
)
Expand All @@ -89,10 +88,10 @@ void setup_cancel_op_test_case(
[&handlers_called](
error_code ec, std::string t, std::string p, publish_props
) {
handlers_called++;
BOOST_TEST(ec == asio::error::operation_aborted);
BOOST_TEST(t == "");
BOOST_TEST(p == "");
++handlers_called;
BOOST_TEST(ec == asio::error::operation_aborted);
BOOST_TEST(t == "");
BOOST_TEST(p == "");
}
)
);
Expand All @@ -113,7 +112,7 @@ void setup_cancel_op_test_case(
[&handlers_called](
error_code ec, std::vector<reason_code> rcs, unsuback_props
) {
handlers_called++;
++handlers_called;
BOOST_TEST(ec == asio::error::operation_aborted);
BOOST_TEST_REQUIRE(rcs.size() == 1u);
BOOST_TEST(rcs[0] == reason_codes::empty);
Expand All @@ -137,7 +136,7 @@ void setup_cancel_op_test_case(
[&handlers_called](
error_code ec, std::vector<reason_code> rcs, suback_props
) {
handlers_called++;
++handlers_called;
BOOST_TEST(ec == asio::error::operation_aborted);
BOOST_TEST_REQUIRE(rcs.size() == 1u);
BOOST_TEST(rcs[0] == reason_codes::empty);
Expand All @@ -150,39 +149,31 @@ template<test::cancel_type c_type, test::operation_type op_type>
void run_cancel_op_test() {
using namespace test;

constexpr int expected_handlers_called = c_type == ioc_stop ? 0 : 1;
constexpr int expected_handlers_called = 1;
int handlers_called = 0;

asio::io_context ioc;
asio::cancellation_signal signal;
client_type c(ioc, "");
c.brokers("127.0.0.1");

asio::cancellation_signal signal;
setup_cancel_op_test_case<op_type>(c, signal, handlers_called);

asio::steady_timer timer(c.get_executor());
timer.expires_after(std::chrono::milliseconds(10));
timer.expires_after(std::chrono::milliseconds(100));
timer.async_wait([&](auto) {
if constexpr (c_type == ioc_stop)
ioc.stop();
else if constexpr (c_type == client_cancel)
if constexpr (c_type == client_cancel)
c.cancel();
else if constexpr (c_type == signal_emit)
signal.emit(asio::cancellation_type_t::terminal);
});


ioc.run_for(std::chrono::seconds(1));
BOOST_CHECK_EQUAL(handlers_called, expected_handlers_called);
ioc.run();
BOOST_TEST(handlers_called == expected_handlers_called);
}

BOOST_AUTO_TEST_SUITE(cancellation/*, *boost::unit_test::disabled()*/)

// hangs
BOOST_AUTO_TEST_CASE(ioc_stop_async_run, *boost::unit_test::disabled()) {
run_cancel_op_test<test::ioc_stop, test::async_run>();
}

BOOST_AUTO_TEST_CASE(client_cancel_async_run) {
run_cancel_op_test<test::client_cancel, test::async_run>();
}
Expand All @@ -191,11 +182,6 @@ BOOST_AUTO_TEST_CASE(signal_emit_async_run) {
run_cancel_op_test<test::signal_emit, test::async_run>();
}

// hangs
BOOST_AUTO_TEST_CASE(ioc_stop_async_publish, *boost::unit_test::disabled()) {
run_cancel_op_test<test::ioc_stop, test::publish>();
}

BOOST_AUTO_TEST_CASE(client_cancel_async_publish) {
run_cancel_op_test<test::client_cancel, test::publish>();
}
Expand All @@ -204,11 +190,6 @@ BOOST_AUTO_TEST_CASE(signal_emit_async_publish) {
run_cancel_op_test<test::signal_emit, test::publish>();
}

// hangs after ping changes
BOOST_AUTO_TEST_CASE(ioc_stop_async_receive, *boost::unit_test::disabled()) {
run_cancel_op_test<test::ioc_stop, test::receive>();
}

BOOST_AUTO_TEST_CASE(client_cancel_async_receive) {
run_cancel_op_test<test::client_cancel, test::receive>();
}
Expand All @@ -218,11 +199,6 @@ BOOST_AUTO_TEST_CASE(signal_emit_async_receive, *boost::unit_test::disabled()) {
run_cancel_op_test<test::signal_emit, test::receive>();
}

// hangs
BOOST_AUTO_TEST_CASE(ioc_stop_async_subscribe, *boost::unit_test::disabled()) {
run_cancel_op_test<test::ioc_stop, test::subscribe>();
}

BOOST_AUTO_TEST_CASE(client_cancel_async_subscribe) {
run_cancel_op_test<test::client_cancel, test::subscribe>();
}
Expand All @@ -231,11 +207,6 @@ BOOST_AUTO_TEST_CASE(signal_emit_async_subscribe) {
run_cancel_op_test<test::signal_emit, test::subscribe>();
}

// hangs
BOOST_AUTO_TEST_CASE(ioc_stop_async_unsubscribe, *boost::unit_test::disabled()) {
run_cancel_op_test<test::ioc_stop, test::unsubscribe>();
}

BOOST_AUTO_TEST_CASE(client_cancel_async_unsubscribe) {
run_cancel_op_test<test::client_cancel, test::unsubscribe>();
}
Expand Down
4 changes: 0 additions & 4 deletions test/integration/coroutine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ BOOST_AUTO_TEST_CASE(tcp_client_check) {
[&](boost::system::error_code ec) {
BOOST_TEST_WARN(ec, "Failed to receive all the expected replies!");
c.cancel();
ioc.stop();
}
);

Expand Down Expand Up @@ -159,7 +158,6 @@ BOOST_AUTO_TEST_CASE(websocket_tcp_client_check) {
[&](boost::system::error_code ec) {
BOOST_TEST_WARN(ec, "Failed to receive all the expected replies!");
c.cancel();
ioc.stop();
}
);

Expand Down Expand Up @@ -195,7 +193,6 @@ BOOST_AUTO_TEST_CASE(openssl_tls_client_check) {
[&](boost::system::error_code ec) {
BOOST_TEST_WARN(ec, "Failed to receive all the expected replies!");
c.cancel();
ioc.stop();
}
);

Expand Down Expand Up @@ -233,7 +230,6 @@ BOOST_AUTO_TEST_CASE(websocket_tls_client_check) {
[&](boost::system::error_code ec) {
BOOST_TEST_WARN(ec, "Failed to receive all the expected replies!");
c.cancel();
ioc.stop();
}
);

Expand Down

0 comments on commit e0ae572

Please sign in to comment.