Skip to content

Commit

Permalink
Remove Beast dependency from rebind_executor.hpp and async_traits.hpp
Browse files Browse the repository at this point in the history
Summary:
related to T15243
remove beast as dependancy from async_traits and rebind_executor

run mqtt_features test on websocket-tcp client instead (tcp broker is usually overloaded)

add tests for next_layer/lowest_layer

Reviewers: ivica

Reviewed By: ivica

Subscribers: iljazovic, miljen

Differential Revision: https://repo.mireo.local/D32212
  • Loading branch information
ksimicevic committed Nov 19, 2024
1 parent f80c189 commit edb9410
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 71 deletions.
127 changes: 77 additions & 50 deletions include/async_mqtt5/detail/async_traits.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
#include <boost/asio/prefer.hpp>
#include <boost/asio/write.hpp>

#include <boost/beast/core/stream_traits.hpp>

#include <boost/type_traits/detected_or.hpp>
#include <boost/type_traits/is_detected.hpp>
#include <boost/type_traits/remove_cv_ref.hpp>
Expand All @@ -35,6 +33,8 @@ void assign_tls_sni(const authority_path& ap, TlsContext& ctx, TlsStream& s);

namespace detail {

// tracking executor

template <typename Handler, typename DfltExecutor>
using tracking_type = std::decay_t<
typename asio::prefer_result<
Expand All @@ -52,18 +52,8 @@ tracking_executor(const Handler& handler, const DfltExecutor& ex) {
);
}

template <typename T, typename ...Ts>
using async_write_sig = decltype(
std::declval<T&>().async_write(std::declval<Ts>()...)
);

constexpr auto write_handler_t = [](error_code, size_t) {};

template <typename T, typename B>
constexpr bool has_async_write = boost::is_detected<
async_write_sig, T, B, decltype(write_handler_t)
>::value;

// tls handshake

constexpr auto handshake_handler_t = [](error_code) {};

Expand All @@ -84,6 +74,7 @@ constexpr bool has_tls_handshake = boost::is_detected<
decltype(handshake_handler_t)
>::value;

// websocket handshake

template <typename T, typename ...Ts>
using async_ws_handshake_sig = decltype(
Expand All @@ -97,59 +88,67 @@ constexpr bool has_ws_handshake = boost::is_detected<
decltype(handshake_handler_t)
>::value;

// next layer

template <typename T>
using tls_context_sig = decltype(
std::declval<T&>().tls_context()
);
using next_layer_sig = decltype(std::declval<T&>().next_layer());

template <typename T>
constexpr bool has_tls_context = boost::is_detected<
tls_context_sig, T
constexpr bool has_next_layer = boost::is_detected<
next_layer_sig, boost::remove_cv_ref_t<T>
>::value;

template <typename T, typename Enable = void>
struct next_layer_type_impl {
using type = T;
};

template <typename T>
using next_layer_sig = decltype(
std::declval<T&>().next_layer()
);
struct next_layer_type_impl<T, std::enable_if_t<has_next_layer<T>>> {
using type = typename T::next_layer_type;
};

template <typename T>
constexpr bool has_next_layer = boost::is_detected<
next_layer_sig, T
>::value;
using next_layer_type = typename next_layer_type_impl<
boost::remove_cv_ref_t<T>
>::type;

template <typename T>
next_layer_type<T>& next_layer(T&& a) {
if constexpr (has_next_layer<T>)
return a.next_layer();
else
return std::forward<T>(a);
}

// lowest layer

template <typename T, typename Enable = void>
struct next_layer_type {
struct lowest_layer_type_impl {
using type = T;
};

template <typename T>
struct next_layer_type<
T, std::enable_if_t<has_next_layer<T>>
> {
using type = typename std::remove_reference_t<T>::next_layer_type;
struct lowest_layer_type_impl<T, std::enable_if_t<has_next_layer<T>>> {
using type = typename lowest_layer_type_impl<
next_layer_type<T>
>::type;
};

template <typename T>
typename next_layer_type<T, std::enable_if_t<!has_next_layer<T>>>::type&
next_layer(T&& a) {
return a;
}
using lowest_layer_type = typename lowest_layer_type_impl<
boost::remove_cv_ref_t<T>
>::type;

template <typename T>
typename next_layer_type<T, std::enable_if_t<has_next_layer<T>>>::type&
next_layer(T&& a) {
return a.next_layer();
lowest_layer_type<T>& lowest_layer(T&& a) {
if constexpr (has_next_layer<T>)
return lowest_layer(a.next_layer());
else
return std::forward<T>(a);
}

template <typename S>
using lowest_layer_type = typename boost::beast::lowest_layer_type<S>;

template <typename S>
lowest_layer_type<S>& lowest_layer(S&& a) {
return boost::beast::get_lowest_layer(std::forward<S>(a));
}
// tls layer

template <typename T, typename Enable = void>
struct has_tls_layer_impl : std::false_type {};
Expand All @@ -171,6 +170,41 @@ constexpr bool has_tls_layer = has_tls_layer_impl<
boost::remove_cv_ref_t<T>
>::value;

// tls context

template <typename T>
using tls_context_sig = decltype(
std::declval<T&>().tls_context()
);

template <typename T>
constexpr bool has_tls_context = boost::is_detected<
tls_context_sig, T
>::value;

// setup_tls_sni

template <typename TlsContext, typename Stream>
void setup_tls_sni(const authority_path& ap, TlsContext& ctx, Stream& s) {
if constexpr (has_tls_handshake<Stream>)
assign_tls_sni(ap, ctx, s);
else if constexpr (has_next_layer<Stream>)
setup_tls_sni(ap, ctx, next_layer(s));
}

// async_write

template <typename T, typename ...Ts>
using async_write_sig = decltype(
std::declval<T&>().async_write(std::declval<Ts>()...)
);

constexpr auto write_handler_t = [](error_code, size_t) {};

template <typename T, typename B>
constexpr bool has_async_write = boost::is_detected<
async_write_sig, T, B, decltype(write_handler_t)
>::value;

template <
typename Stream,
Expand All @@ -190,13 +224,6 @@ decltype(auto) async_write(
);
}

template <typename TlsContext, typename Stream>
void setup_tls_sni(const authority_path& ap, TlsContext& ctx, Stream& s) {
if constexpr (has_tls_handshake<Stream>)
assign_tls_sni(ap, ctx, s);
else if constexpr (has_next_layer<Stream>)
setup_tls_sni(ap, ctx, next_layer(s));
}

} // end namespace detail

Expand Down
24 changes: 18 additions & 6 deletions include/async_mqtt5/detail/rebind_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
#ifndef ASYNC_MQTT5_REBIND_EXECUTOR_HPP
#define ASYNC_MQTT5_REBIND_EXECUTOR_HPP

#include <boost/beast/websocket/stream.hpp>

namespace boost::asio::ssl {

// forward declare to preserve optional OpenSSL dependency
Expand All @@ -18,6 +16,15 @@ class stream;

} // end namespace boost::asio::ssl

// forward declare to avoid Beast dependency

namespace boost::beast::websocket {

template <typename Stream, bool deflate_supported>
class stream;

}// end namespace boost::beast::websocket

namespace async_mqtt5::detail {

namespace asio = boost::asio;
Expand All @@ -30,14 +37,19 @@ struct rebind_executor {
// asio::ssl::stream does not define a rebind_executor member type
template <typename Stream, typename Executor>
struct rebind_executor<asio::ssl::stream<Stream>, Executor> {
using other = typename asio::ssl::stream<typename rebind_executor<Stream, Executor>::other>;
using other = typename asio::ssl::stream<
typename rebind_executor<Stream, Executor>::other
>;
};

template <typename Stream, typename Executor>
struct rebind_executor<boost::beast::websocket::stream<asio::ssl::stream<Stream>>, Executor> {
template <typename Stream, bool deflate_supported, typename Executor>
struct rebind_executor<
boost::beast::websocket::stream<asio::ssl::stream<Stream>, deflate_supported>,
Executor
> {
using other = typename boost::beast::websocket::stream<
asio::ssl::stream<typename rebind_executor<Stream, Executor>::other>,
boost::beast::websocket::stream<asio::ssl::stream<Stream>>::is_deflate_supported::value
deflate_supported
>;
};

Expand Down
4 changes: 2 additions & 2 deletions include/async_mqtt5/impl/connect_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,10 @@ class connect_op {
);
}
else if constexpr (
has_tls_handshake<typename next_layer_type<Stream>::type>
has_tls_handshake<next_layer_type<Stream>>
) {
_stream.next_layer().async_handshake(
tls_handshake_type<typename next_layer_type<Stream>::type>::client,
tls_handshake_type<next_layer_type<Stream>>::client,
asio::append(
asio::prepend(std::move(*this), on_tls_handshake {}),
std::move(ep), std::move(ap)
Expand Down
16 changes: 9 additions & 7 deletions test/integration/mqtt_features.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

#include <boost/asio/experimental/awaitable_operators.hpp>

#include <boost/beast/websocket/stream.hpp>

#include <async_mqtt5.hpp>

BOOST_AUTO_TEST_SUITE(mqtt_features/*, *boost::unit_test::disabled()*/)
Expand All @@ -34,9 +36,9 @@ constexpr auto use_nothrow_awaitable = asio::as_tuple(asio::use_awaitable);

constexpr auto test_duration = std::chrono::seconds(5);

using stream_type = asio::ip::tcp::socket;
using stream_type = boost::beast::websocket::stream<asio::ip::tcp::socket>;

constexpr auto broker = "broker.hivemq.com";
constexpr auto broker = "broker.hivemq.com/mqtt";
constexpr auto connect_wait_dur = std::chrono::milliseconds(200);
constexpr auto topic = "async-mqtt5/test";
constexpr auto share_topic = "$share/sharename/async-mqtt5/test";
Expand All @@ -62,7 +64,7 @@ asio::awaitable<void> test_manual_use_topic_alias() {
auto ex = co_await asio::this_coro::executor;

mqtt_client<stream_type> client(ex);
client.brokers(broker)
client.brokers(broker, 8000)
.connect_property(prop::topic_alias_maximum, uint16_t(10))
.async_run(asio::detached);

Expand Down Expand Up @@ -94,7 +96,7 @@ asio::awaitable<void> test_subscription_identifiers() {
auto ex = co_await asio::this_coro::executor;

mqtt_client<stream_type> client(ex);
client.brokers(broker)
client.brokers(broker, 8000)
.async_run(asio::detached);

publish_props pprops;
Expand All @@ -109,7 +111,7 @@ asio::awaitable<void> test_subscription_identifiers() {
sprops[prop::subscription_identifier] = sub_id;

subscribe_options sub_opts = { .no_local = no_local_e::no };
subscribe_topic sub_topic = { share_topic, sub_opts };
subscribe_topic sub_topic = { topic, sub_opts };
auto&& [ec_2, rcs, __] = co_await client.async_subscribe(
sub_topic, sprops, use_nothrow_awaitable
);
Expand All @@ -135,7 +137,7 @@ asio::awaitable<void> test_shared_subscription() {
auto ex = co_await asio::this_coro::executor;

mqtt_client<stream_type> client(ex);
client.brokers(broker)
client.brokers(broker, 8000)
.async_run(asio::detached);

subscribe_options sub_opts = { .no_local = no_local_e::no };
Expand Down Expand Up @@ -170,7 +172,7 @@ asio::awaitable<void> test_user_property() {
auto ex = co_await asio::this_coro::executor;

mqtt_client<stream_type> client(ex);
client.brokers(broker)
client.brokers(broker, 8000)
.async_run(asio::detached);

publish_props pprops;
Expand Down
Loading

0 comments on commit edb9410

Please sign in to comment.