diff --git a/src/net/zmq_async.cpp b/src/net/zmq_async.cpp index b7e8405..9636d6e 100644 --- a/src/net/zmq_async.cpp +++ b/src/net/zmq_async.cpp @@ -86,7 +86,7 @@ namespace net { namespace zmq } } - expect async_client::make(boost::asio::io_service& io, socket zsock) + expect async_client::make(boost::asio::io_context& io, socket zsock) { MONERO_PRECOND(zsock != nullptr); diff --git a/src/net/zmq_async.h b/src/net/zmq_async.h index 471f4ef..cb37707 100644 --- a/src/net/zmq_async.h +++ b/src/net/zmq_async.h @@ -28,7 +28,7 @@ #pragma once #include -#include +#include #include #include #include @@ -63,7 +63,7 @@ namespace net { namespace zmq asocket asock; bool close; - static expect make(boost::asio::io_service& io, socket zsock); + static expect make(boost::asio::io_context& io, socket zsock); }; class read_msg_op diff --git a/src/rest_server.cpp b/src/rest_server.cpp index ac860c3..8a482e4 100644 --- a/src/rest_server.cpp +++ b/src/rest_server.cpp @@ -27,8 +27,10 @@ #include "rest_server.h" #include +#include +#include +#include #include -#include #include #include #include @@ -46,6 +48,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -81,6 +86,53 @@ namespace lws { + struct runtime_options + { + const std::uint32_t max_subaddresses; + const epee::net_utils::ssl_verification_t webhook_verify; + const bool disable_admin_auth; + const bool auto_accept_creation; + }; + + struct rest_server_data + { + boost::asio::io_context io; + const db::storage disk; + const rpc::client client; + const runtime_options options; + std::vector clients; + boost::mutex sync; + + rest_server_data(db::storage disk, rpc::client client, runtime_options options) + : io(), + disk(std::move(disk)), + client(std::move(client)), + options(std::move(options)), + clients(), + sync() + {} + + expect get_async_client(boost::asio::io_context& io) + { + boost::unique_lock lock{sync}; + if (!clients.empty()) + { + net::zmq::async_client out{std::move(clients.back())}; + clients.pop_back(); + return out; + } + lock.unlock(); + return client.make_async_client(io); + } + + void store_async_client(net::zmq::async_client&& client) + { + const boost::lock_guard lock{sync}; + client.close = false; + clients.push_back(std::move(client)); + } + }; + namespace { namespace http = epee::net_utils::http; @@ -272,51 +324,13 @@ namespace lws std::atomic_flag rates_error_once = ATOMIC_FLAG_INIT; - struct runtime_options - { - std::uint32_t max_subaddresses; - epee::net_utils::ssl_verification_t webhook_verify; - bool disable_admin_auth; - bool auto_accept_creation; - }; - - struct rest_server_data - { - const db::storage disk; - const rpc::client client; - const runtime_options options; - - std::vector clients; - boost::mutex sync; - - expect get_async_client(boost::asio::io_service& io) - { - boost::unique_lock lock{sync}; - if (!clients.empty()) - { - net::zmq::async_client out{std::move(clients.back())}; - clients.pop_back(); - return out; - } - lock.unlock(); - return client.make_async_client(io); - } - - void store_async_client(net::zmq::async_client&& client) - { - const boost::lock_guard lock{sync}; - client.close = false; - clients.push_back(std::move(client)); - } - }; - struct daemon_status { using request = rpc::daemon_status_request; using response = epee::byte_slice; // sometimes async using async_response = rpc::daemon_status_response; - static expect handle(const request&, boost::asio::io_service& io, rest_server_data& data, std::function resume) + static expect handle(const request&, rest_server_data& data, std::function&& resume) { using info_rpc = cryptonote::rpc::GetInfo; @@ -327,16 +341,16 @@ namespace lws std::string in; net::zmq::async_client client; boost::asio::steady_timer timer; - boost::asio::io_service::strand strand; + boost::asio::io_context::strand strand; std::vector> resumers; - frame(rest_server_data& parent, boost::asio::io_service& io, net::zmq::async_client client) + frame(rest_server_data& parent, net::zmq::async_client client) : parent(std::addressof(parent)), out(), in(), client(std::move(client)), - timer(io), - strand(io), + timer(parent.io), + strand(parent.io), resumers() { info_rpc::Request daemon_req{}; @@ -380,6 +394,7 @@ namespace lws void send_response(const boost::system::error_code error, const expect& value) { assert(self_ != nullptr); + assert(self_->strand.running_in_this_thread()); if (error) MERROR("Failure in /daemon_status: " << error.message()); @@ -420,6 +435,7 @@ namespace lws if (!self_ || error == boost::asio::error::operation_aborted) return; + assert(self_->strand.running_in_this_thread()); MWARNING("Timeout on /daemon_status ZMQ call"); self_->client.close = true; self_->client.asock->cancel(error); @@ -430,7 +446,7 @@ namespace lws if (!self_->timer.expires_after(timeout) && expecting) return false; - self_->timer.async_wait(self_->strand.wrap(on_timeout{self_})); + self_->timer.async_wait(boost::asio::bind_executor(self_->strand, on_timeout{self_})); return true; } @@ -442,18 +458,19 @@ namespace lws return send_response(error, json_response(async_response{})); frame& self = *self_; + assert(self.strand.running_in_this_thread()); BOOST_ASIO_CORO_REENTER(*this) { set_timeout(std::chrono::seconds{2}, false); BOOST_ASIO_CORO_YIELD net::zmq::async_write( - self.client, std::move(self.out), self.strand.wrap(std::move(*this)) + self.client, std::move(self.out), boost::asio::bind_executor(self.strand, std::move(*this)) ); if (!set_timeout(std::chrono::seconds{5}, true)) return send_response(boost::asio::error::operation_aborted, json_response(async_response{})); BOOST_ASIO_CORO_YIELD net::zmq::async_read( - self.client, self.in, self.strand.wrap(std::move(*this)) + self.client, self.in, boost::asio::bind_executor(self.strand, std::move(*this)) ); if (!self.timer.cancel(error)) @@ -486,18 +503,18 @@ namespace lws } }; - expect client = data.get_async_client(io); + expect client = data.get_async_client(data.io); if (!client) return client.error(); - active = std::make_shared(data, io, std::move(*client)); + active = std::make_shared(data, std::move(*client)); cache.result = nullptr; cache.status = active; active->resumers.push_back(std::move(resume)); lock.unlock(); MDEBUG("Starting new ZMQ request in /daemon_status"); - active->strand.dispatch(async_handler{active}); + boost::asio::dispatch(active->strand, async_handler{active}); return async_ready(); } }; @@ -507,7 +524,7 @@ namespace lws using request = rpc::account_credentials; using response = rpc::get_address_info_response; - static expect handle(const request& req, const boost::asio::io_service&, const rest_server_data& data, std::function) + static expect handle(const request& req, const rest_server_data& data, std::function&&) { auto user = open_account(req, data.disk.clone()); if (!user) @@ -581,7 +598,7 @@ namespace lws using request = rpc::account_credentials; using response = rpc::get_address_txs_response; - static expect handle(const request& req, const boost::asio::io_service&, const rest_server_data& data, std::function) + static expect handle(const request& req, const rest_server_data& data, std::function&&) { auto user = open_account(req, data.disk.clone()); if (!user) @@ -705,7 +722,7 @@ namespace lws using response = void; // always asynchronous response using async_response = rpc::get_random_outs_response; - static expect handle(request req, boost::asio::io_service& io, const rest_server_data& data, std::function resume) + static expect handle(request req, const rest_server_data& data, std::function&& resume) { using distribution_rpc = cryptonote::rpc::GetOutputDistribution; using histogram_rpc = cryptonote::rpc::GetOutputHistogram; @@ -846,7 +863,7 @@ namespace lws using request = rpc::account_credentials; using response = rpc::get_subaddrs_response; - static expect handle(request const& req, const boost::asio::io_service&, const rest_server_data& data, std::function) + static expect handle(request const& req, const rest_server_data& data, std::function&&) { auto user = open_account(req, data.disk.clone()); if (!user) @@ -925,7 +942,7 @@ namespace lws ); } - static expect handle(request req, boost::asio::io_service& io, rest_server_data& data, std::function resume) + static expect handle(request&& req, rest_server_data& data, std::function&& resume) { struct frame { @@ -934,16 +951,16 @@ namespace lws std::string in; net::zmq::async_client client; boost::asio::steady_timer timer; - boost::asio::io_service::strand strand; + boost::asio::io_context::strand strand; std::vector>> resumers; - frame(rest_server_data& parent, boost::asio::io_service& io, net::zmq::async_client client) + frame(rest_server_data& parent, net::zmq::async_client client) : parent(std::addressof(parent)), out(), in(), client(std::move(client)), - timer(io), - strand(io), + timer(parent.io), + strand(parent.io), resumers() { rpc_command::Request req{}; @@ -992,6 +1009,7 @@ namespace lws void send_response(const boost::system::error_code error, expect value) { assert(self_ != nullptr); + assert(self_->strand.running_in_this_thread()); if (error) { @@ -1035,6 +1053,7 @@ namespace lws if (!self_ || error == boost::asio::error::operation_aborted) return; + assert(self_->strand.running_in_this_thread()); MWARNING("Timeout on /get_unspent_outs ZMQ call"); self_->client.close = true; self_->client.asock->cancel(error); @@ -1045,7 +1064,7 @@ namespace lws if (!self_->timer.expires_after(timeout) && expecting) return false; - self_->timer.async_wait(self_->strand.wrap(on_timeout{self_})); + self_->timer.async_wait(boost::asio::bind_executor(self_->strand, on_timeout{self_})); return true; } @@ -1059,18 +1078,19 @@ namespace lws return send_response(error, default_response{}); frame& self = *self_; + assert(self.strand.running_in_this_thread()); BOOST_ASIO_CORO_REENTER(*this) { set_timeout(std::chrono::seconds{2}, false); BOOST_ASIO_CORO_YIELD net::zmq::async_write( - self.client, std::move(self.out), self.strand.wrap(std::move(*this)) + self.client, std::move(self.out), boost::asio::bind_executor(self.strand, std::move(*this)) ); if (!set_timeout(std::chrono::seconds{5}, true)) return send_response(boost::asio::error::operation_aborted, default_response{}); BOOST_ASIO_CORO_YIELD net::zmq::async_read( - self.client, self.in, self.strand.wrap(std::move(*this)) + self.client, self.in, boost::asio::bind_executor(self.strand, std::move(*this)) ); if (!self.timer.cancel(error)) @@ -1088,18 +1108,18 @@ namespace lws } }; - expect client = data.get_async_client(io); + expect client = data.get_async_client(data.io); if (!client) return client.error(); - active = std::make_shared(data, io, std::move(*client)); + active = std::make_shared(data, std::move(*client)); cache.result = rpc_command::Response{}; cache.status = active; active->resumers.emplace_back(std::move(req), std::move(resume)); lock.unlock(); MDEBUG("Starting new ZMQ request in /get_unspent_outs"); - active->strand.dispatch(async_handler{active}); + boost::asio::dispatch(active->strand, async_handler{active}); return async_ready(); } }; @@ -1109,7 +1129,7 @@ namespace lws using request = rpc::account_credentials; using response = rpc::import_response; - static expect handle(request req, const boost::asio::io_service&, const rest_server_data& data, std::function) + static expect handle(request req, const rest_server_data& data, std::function&&) { bool new_request = false; bool fulfilled = false; @@ -1149,7 +1169,7 @@ namespace lws using request = rpc::login_request; using response = rpc::login_response; - static expect handle(request req, boost::asio::io_service& io, const rest_server_data& data, std::function resume) + static expect handle(request req, const rest_server_data& data, std::function&& resume) { if (!key_check(req.creds)) return {lws::error::bad_view_key}; @@ -1206,7 +1226,7 @@ namespace lws using request = rpc::provision_subaddrs_request; using response = rpc::new_subaddrs_response; - static expect handle(request req, const boost::asio::io_service&, const rest_server_data& data, std::function) + static expect handle(const request& req, const rest_server_data& data, std::function&&) { if (!req.maj_i && !req.min_i && !req.n_min && !req.n_maj) return {lws::error::invalid_range}; @@ -1270,7 +1290,7 @@ namespace lws using response = void; // always async using async_response = rpc::submit_raw_tx_response; - static expect handle(request req, boost::asio::io_service& io, rest_server_data& data, std::function resume) + static expect handle(request req, rest_server_data& data, std::function&& resume) { using transaction_rpc = cryptonote::rpc::SendRawTxHex; @@ -1280,15 +1300,15 @@ namespace lws std::string in; net::zmq::async_client client; boost::asio::steady_timer timer; - boost::asio::io_service::strand strand; + boost::asio::io_context::strand strand; std::deque>> resumers; - frame(rest_server_data& parent, boost::asio::io_service& io, net::zmq::async_client client) + frame(rest_server_data& parent, net::zmq::async_client client) : parent(std::addressof(parent)), in(), client(std::move(client)), - timer(io), - strand(io), + timer(parent.io), + strand(parent.io), resumers() {} }; @@ -1330,6 +1350,7 @@ namespace lws void send_response(const boost::system::error_code error, expect value) { assert(self_ != nullptr); + assert(self_->strand.running_in_this_thread()); std::deque>> resumers; { @@ -1365,6 +1386,7 @@ namespace lws if (!self_ || error == boost::asio::error::operation_aborted) return; + assert(self_->strand.running_in_this_thread()); MWARNING("Timeout on /submit_raw_tx ZMQ call"); self_->client.close = true; self_->client.asock->cancel(error); @@ -1375,7 +1397,7 @@ namespace lws if (!self_->timer.expires_after(timeout) && expecting) return false; - self_->timer.async_wait(self_->strand.wrap(on_timeout{self_})); + self_->timer.async_wait(boost::asio::bind_executor(self_->strand, on_timeout{self_})); return true; } @@ -1387,6 +1409,7 @@ namespace lws return send_response(error, async_ready()); frame& self = *self_; + assert(self.strand.running_in_this_thread()); epee::byte_slice next = nullptr; BOOST_ASIO_CORO_REENTER(*this) { @@ -1405,7 +1428,7 @@ namespace lws set_timeout(std::chrono::seconds{10}, false); BOOST_ASIO_CORO_YIELD net::zmq::async_write( - self.client, std::move(next), self.strand.wrap(std::move(*this)) + self.client, std::move(next), boost::asio::bind_executor(self.strand, std::move(*this)) ); if (!set_timeout(std::chrono::seconds{20}, true)) @@ -1413,7 +1436,7 @@ namespace lws self.in.clear(); // could be in moved-from state BOOST_ASIO_CORO_YIELD net::zmq::async_read( - self.client, self.in, self.strand.wrap(std::move(*this)) + self.client, self.in, boost::asio::bind_executor(self.strand, std::move(*this)) ); if (!self.timer.cancel(error)) @@ -1436,18 +1459,18 @@ namespace lws } }; - expect client = data.get_async_client(io); + expect client = data.get_async_client(data.io); if (!client) return client.error(); - active = std::make_shared(data, io, std::move(*client)); + active = std::make_shared(data, std::move(*client)); cache.status = active; active->resumers.emplace_back(std::move(msg), std::move(resume)); lock.unlock(); MDEBUG("Starting new ZMQ request in /submit_raw_tx"); - active->strand.dispatch(async_handler{active}); + boost::asio::dispatch(active->strand, async_handler{active}); return success(); } }; @@ -1457,7 +1480,7 @@ namespace lws using request = rpc::upsert_subaddrs_request; using response = rpc::new_subaddrs_response; - static expect handle(request req, const boost::asio::io_service&, const rest_server_data& data, std::function) + static expect handle(request req, const rest_server_data& data, std::function&&) { if (!data.options.max_subaddresses) return {lws::error::max_subaddresses}; @@ -1494,7 +1517,7 @@ namespace lws }; template - expect call(std::string&& root, boost::asio::io_service& io, rest_server_data& data, std::function resume) + expect call(std::string&& root, rest_server_data& data, std::function&& resume) { using request = typename E::request; using response = typename E::response; @@ -1509,7 +1532,7 @@ namespace lws if (error) return error; - expect resp = E::handle(std::move(req), io, data, std::move(resume)); + expect resp = E::handle(std::move(req), data, std::move(resume)); if (!resp) return resp.error(); return json_response(std::move(resp)); @@ -1534,7 +1557,7 @@ namespace lws } template - expect call_admin(std::string&& root, boost::asio::io_service&, rest_server_data& data, std::function) + expect call_admin(std::string&& root, rest_server_data& data, std::function&&) { using request = typename E::request; @@ -1575,7 +1598,7 @@ namespace lws struct endpoint { char const* const name; - expect (*const run)(std::string&&, boost::asio::io_service&, rest_server_data&, std::function); + expect (*const run)(std::string&&, rest_server_data&, std::function&&); const unsigned max_size; const bool is_async; }; @@ -1686,18 +1709,16 @@ namespace lws struct rest_server::internal { - rest_server_data data; boost::optional prefix; boost::optional admin_prefix; boost::optional ssl_; boost::asio::ip::tcp::acceptor acceptor; - explicit internal(boost::asio::io_service& io_service, lws::db::storage disk, rpc::client client, runtime_options options) - : data{std::move(disk), std::move(client), std::move(options)} - , prefix() + explicit internal(boost::asio::io_context& io) + : prefix() , admin_prefix() , ssl_() - , acceptor(io_service) + , acceptor(io) { assert(std::is_sorted(std::begin(endpoints), std::end(endpoints), by_name)); } @@ -1732,24 +1753,25 @@ namespace lws template struct rest_server::connection { + rest_server_data* global_; internal* parent_; Sock sock_; boost::beast::flat_static_buffer buffer_; boost::optional> parser_; boost::beast::http::response response_; boost::asio::steady_timer timer_; - boost::asio::io_service::strand strand_; + boost::asio::io_context::strand strand_; bool keep_alive_; - static boost::asio::ip::tcp::socket make_socket(std::true_type, internal* parent) + static boost::asio::ip::tcp::socket make_socket(std::true_type, rest_server_data* global, internal*) { - return boost::asio::ip::tcp::socket{GET_IO_SERVICE(parent->acceptor)}; + return boost::asio::ip::tcp::socket{global->io}; } - static boost::asio::ssl::stream make_socket(std::false_type, internal* parent) + static boost::asio::ssl::stream make_socket(std::false_type, rest_server_data* global, internal* parent) { return boost::asio::ssl::stream{ - GET_IO_SERVICE(parent->acceptor), parent->ssl_.value() + global->io, parent->ssl_.value() }; } @@ -1765,14 +1787,15 @@ namespace lws boost::asio::ip::tcp::socket& sock() { return get_tcp(sock_); } - explicit connection(internal* parent) noexcept - : parent_(parent), - sock_(make_socket(std::is_same(), parent)), + explicit connection(rest_server_data* global, internal* parent) noexcept + : global_(global), + parent_(parent), + sock_(make_socket(std::is_same(), global, parent)), buffer_{}, parser_{}, response_{}, - timer_(GET_IO_SERVICE(parent->acceptor)), - strand_(GET_IO_SERVICE(parent->acceptor)), + timer_(global->io), + strand_(global->io), keep_alive_(true) {} @@ -1840,6 +1863,7 @@ namespace lws if (!self_ || error == boost::asio::error::operation_aborted) return; + assert(self_->strand_.running_in_this_thread()); MWARNING("Timeout on REST connection to " << self_->sock().remote_endpoint(error) << " / " << self_.get()); self_->sock().cancel(error); self_->shutdown(); @@ -1848,7 +1872,7 @@ namespace lws if (!self->timer_.expires_after(timeout) && existing) return false; // timeout queued, just abort - self->timer_.async_wait(self->strand_.wrap(on_timeout{self})); + self->timer_.async_wait(boost::asio::bind_executor(self->strand_, on_timeout{self})); return true; } @@ -1878,7 +1902,7 @@ namespace lws connection& self = *self_; self.sock_.async_handshake( boost::asio::ssl::stream::server, - self.strand_.wrap(std::move(*this)) + boost::asio::bind_executor(self.strand_, std::move(*this)) ); } @@ -1919,7 +1943,9 @@ namespace lws { /* The `resumer` callback can be invoked in another strand (created by the handler function), and therefore needs to be "wrapped" to - ensure thread safety. This also allows `resume` to be unwrapped. */ + ensure thread safety. This also allows `resume` to be unwrapped. + DO NOT use `boost::asio::bind_executor` here as it doesn't create + a new callable like `wrap` does. */ const auto& self = self_; resumer = self->strand_.wrap( [self, resume] (expect body) mutable @@ -1933,7 +1959,7 @@ namespace lws } MDEBUG("Running REST handler " << handler->name << " on " << self_.get()); - auto body = handler->run(std::move(self_->parser_->get()).body(), GET_IO_SERVICE(self_->timer_), self_->parent_->data, std::move(resumer)); + auto body = handler->run(std::move(self_->parser_->get()).body(), *self_->global_, std::move(resumer)); if (!body) return self_->bad_request(body.error(), std::forward(resume)); else if (!handler->is_async || !body->empty()) @@ -1980,7 +2006,7 @@ namespace lws MDEBUG("Reading new REST request from " << self_.get()); BOOST_ASIO_CORO_YIELD boost::beast::http::async_read( - self.sock_, self.buffer_, *self.parser_, self.strand_.wrap(std::move(*this)) + self.sock_, self.buffer_, *self.parser_, boost::asio::bind_executor(self.strand_, std::move(*this)) ); // async_response will have its own timeouts set in handlers if async @@ -1993,7 +2019,7 @@ namespace lws connection::set_timeout(self_, rest_response_timeout, false); BOOST_ASIO_CORO_YIELD boost::beast::http::async_write( - self.sock_, self.response_, self.strand_.wrap(std::move(*this)) + self.sock_, self.response_, boost::asio::bind_executor(self.strand_, std::move(*this)) ); if (!self.keep_alive_) @@ -2006,24 +2032,25 @@ namespace lws template struct rest_server::accept_loop final : public boost::asio::coroutine { - internal* self_; + rest_server_data* global_; + internal* parent_; std::shared_ptr> next_; - explicit accept_loop(internal* self) noexcept - : self_(self), next_(nullptr) + explicit accept_loop(rest_server_data* global, internal* parent) noexcept + : global_(global), parent_(parent), next_(nullptr) {} void operator()(boost::system::error_code error = {}) { - if (!self_) + if (!global_ || !parent_) return; BOOST_ASIO_CORO_REENTER(*this) { for (;;) { - next_ = std::make_shared>(self_); - BOOST_ASIO_CORO_YIELD self_->acceptor.async_accept(next_->sock(), std::move(*this)); + next_ = std::make_shared>(global_, parent_); + BOOST_ASIO_CORO_YIELD parent_->acceptor.async_accept(next_->sock(), std::move(*this)); if (error) { @@ -2032,7 +2059,7 @@ namespace lws else { MDEBUG("New connection to " << next_->sock().remote_endpoint(error) << " / " << next_.get()); - next_->strand_.dispatch(handler_loop{next_}); + boost::asio::dispatch(next_->strand_, handler_loop{next_}); } } } @@ -2041,7 +2068,7 @@ namespace lws void rest_server::run_io() { - try { io_service_.run(); } + try { global_->io.run(); } catch (const std::exception& e) { std::raise(SIGINT); @@ -2055,13 +2082,15 @@ namespace lws } rest_server::rest_server(epee::span addresses, std::vector admin, db::storage disk, rpc::client client, configuration config) - : io_service_(), ports_(), workers_() + : global_(std::make_unique(std::move(disk), std::move(client), runtime_options{config.max_subaddresses, config.webhook_verify, config.disable_admin_auth, config.auto_accept_creation})), + ports_(), + workers_() { if (addresses.empty()) MONERO_THROW(common_error::kInvalidArgument, "REST server requires 1 or more addresses"); std::sort(admin.begin(), admin.end()); - const auto init_port = [&admin] (internal& port, const std::string& address, configuration config, const bool is_admin) -> bool + const auto init_port = [this, &admin] (internal& port, const std::string& address, configuration config, const bool is_admin) -> bool { epee::net_utils::http::url_content url{}; if (!epee::net_utils::parse_url(address, url)) @@ -2157,24 +2186,23 @@ namespace lws if (ssl_options) { port.ssl_ = ssl_options.create_context(); - accept_loop>{std::addressof(port)}(); + accept_loop>{global_.get(), std::addressof(port)}(); } else - accept_loop{std::addressof(port)}(); + accept_loop{global_.get(), std::addressof(port)}(); return https; }; bool any_ssl = false; - const runtime_options options{config.max_subaddresses, config.webhook_verify, config.disable_admin_auth, config.auto_accept_creation}; for (const std::string& address : addresses) { - ports_.emplace_back(io_service_, disk.clone(), MONERO_UNWRAP(client.clone()), options); + ports_.emplace_back(global_->io); any_ssl |= init_port(ports_.back(), address, config, false); } for (const std::string& address : admin) { - ports_.emplace_back(io_service_, disk.clone(), MONERO_UNWRAP(client.clone()), options); + ports_.emplace_back(global_->io); any_ssl |= init_port(ports_.back(), address, config, true); } @@ -2190,7 +2218,7 @@ namespace lws rest_server::~rest_server() noexcept { - io_service_.stop(); + global_->io.stop(); for (auto& t : workers_) { if (t.joinable()) diff --git a/src/rest_server.h b/src/rest_server.h index 2329ee1..7452147 100644 --- a/src/rest_server.h +++ b/src/rest_server.h @@ -27,10 +27,10 @@ #pragma once -#include #include -#include +#include #include +#include #include #include @@ -41,19 +41,20 @@ namespace lws { + struct rest_server_data; class rest_server { struct internal; template struct connection; template struct handler_loop; template struct accept_loop; - - boost::asio::io_service io_service_; //!< Put first so its destroyed last + + std::unique_ptr global_; std::list ports_; std::vector workers_; void run_io(); - + public: struct configuration { @@ -66,14 +67,14 @@ namespace lws bool disable_admin_auth; bool auto_accept_creation; }; - + explicit rest_server(epee::span addresses, std::vector admin, db::storage disk, rpc::client client, configuration config); - + rest_server(rest_server&&) = delete; rest_server(rest_server const&) = delete; - + ~rest_server() noexcept; - + rest_server& operator=(rest_server&&) = delete; rest_server& operator=(rest_server const&) = delete; }; diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp index 0275e03..3ced99c 100644 --- a/src/rpc/client.cpp +++ b/src/rpc/client.cpp @@ -399,7 +399,7 @@ namespace rpc return {lws::error::bad_daemon_response}; } - expect client::make_async_client(boost::asio::io_service& io) const + expect client::make_async_client(boost::asio::io_context& io) const { MONERO_PRECOND(ctx != nullptr); diff --git a/src/rpc/client.h b/src/rpc/client.h index e93ea0f..6dba671 100644 --- a/src/rpc/client.h +++ b/src/rpc/client.h @@ -26,7 +26,7 @@ // THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #pragma once -#include +#include #include #include #include @@ -139,7 +139,7 @@ namespace rpc } //! \return `async_client` to daemon. Thread safe. - expect make_async_client(boost::asio::io_service& io) const; + expect make_async_client(boost::asio::io_context& io) const; /*! Queue `message` for sending to daemon. If the queue is full, wait a diff --git a/src/rpc/scanner/client.cpp b/src/rpc/scanner/client.cpp index 989cc7a..a4837fd 100644 --- a/src/rpc/scanner/client.cpp +++ b/src/rpc/scanner/client.cpp @@ -27,14 +27,15 @@ #include "client.h" +#include #include +#include #include #include #include #include "common/expect.h" // monero/src #include "misc_log_ex.h" // monero/contrib/epee/include -#include "net/net_utils_base.h" // monero/contrib/epee/include #include "rpc/scanner/commands.h" #include "rpc/scanner/connection.h" #include "rpc/scanner/read_commands.h" @@ -121,9 +122,13 @@ namespace lws { namespace rpc { namespace scanner { MINFO("Attempting connection to " << self_->server_address_); self_->connect_timer_.expires_from_now(connect_timeout); - self_->connect_timer_.async_wait(self_->strand_.wrap(close{self_})); + self_->connect_timer_.async_wait( + boost::asio::bind_executor(self_->strand_, close{self_}) + ); - BOOST_ASIO_CORO_YIELD self_->sock_.async_connect(self_->server_address_, self_->strand_.wrap(*this)); + BOOST_ASIO_CORO_YIELD self_->sock_.async_connect( + self_->server_address_, boost::asio::bind_executor(self_->strand_, *this) + ); if (!self_->connect_timer_.cancel() || error) { @@ -135,7 +140,9 @@ namespace lws { namespace rpc { namespace scanner MINFO("Retrying connection in " << std::chrono::seconds{reconnect_interval}.count() << " seconds"); self_->connect_timer_.expires_from_now(reconnect_interval); - BOOST_ASIO_CORO_YIELD self_->connect_timer_.async_wait(self_->strand_.wrap(*this)); + BOOST_ASIO_CORO_YIELD self_->connect_timer_.async_wait( + boost::asio::bind_executor(self_->strand_, *this) + ); } MINFO("Connection made to " << self_->server_address_); @@ -147,7 +154,7 @@ namespace lws { namespace rpc { namespace scanner } }; - client::client(boost::asio::io_service& io, const std::string& address, std::string pass, std::vector> local) + client::client(boost::asio::io_context& io, const std::string& address, std::string pass, std::vector> local) : connection(io), local_(std::move(local)), pass_(std::move(pass)), @@ -182,11 +189,14 @@ namespace lws { namespace rpc { namespace scanner { if (!self) MONERO_THROW(common_error::kInvalidArgument, "nullptr self"); - self->strand_.dispatch([self] () + boost::asio::dispatch( + self->strand_, + [self] () { if (!self->sock_.is_open()) connector{self}(); - }); + } + ); } void client::push_accounts(const std::shared_ptr& self, std::vector users) @@ -194,7 +204,9 @@ namespace lws { namespace rpc { namespace scanner if (!self) MONERO_THROW(common_error::kInvalidArgument, "nullptr self"); - self->strand_.dispatch([self, users = std::move(users)] () mutable + boost::asio::dispatch( + self->strand_, + [self, users = std::move(users)] () mutable { /* Keep this algorithm simple, one user at a time. A little more difficult to do multiples at once */ @@ -207,7 +219,8 @@ namespace lws { namespace rpc { namespace scanner ); self->next_push_ %= self->local_.size(); } - }); + } + ); } void client::replace_accounts(const std::shared_ptr& self, std::vector users) @@ -215,7 +228,9 @@ namespace lws { namespace rpc { namespace scanner if (!self) MONERO_THROW(common_error::kInvalidArgument, "nullptr self"); - self->strand_.dispatch([self, users = std::move(users)] () mutable + boost::asio::dispatch( + self->strand_, + [self, users = std::move(users)] () mutable { MINFO("Received " << users.size() << " accounts as new workload"); for (std::size_t i = 0; i < self->local_.size(); ++i) @@ -230,7 +245,8 @@ namespace lws { namespace rpc { namespace scanner self->local_[i]->replace_accounts(std::move(next)); } self->next_push_ = 0; - }); + } + ); } void client::send_update(const std::shared_ptr& self, std::vector users, std::vector blocks) @@ -238,17 +254,20 @@ namespace lws { namespace rpc { namespace scanner if (!self) MONERO_THROW(common_error::kInvalidArgument, "nullptr self"); - self->strand_.dispatch([self, users = std::move(users), blocks = std::move(blocks)] () mutable + boost::asio::dispatch( + self->strand_, + [self, users = std::move(users), blocks = std::move(blocks)] () mutable { if (!self->connected_) MONERO_THROW(common_error::kInvalidArgument, "not connected"); write_command(self, update_accounts{std::move(users), std::move(blocks)}); - }); + } + ); } void client::cleanup() { base_cleanup(); - GET_IO_SERVICE(sock_).stop(); + context().stop(); } }}} // lws // rpc // scanner diff --git a/src/rpc/scanner/client.h b/src/rpc/scanner/client.h index fa0831b..03f9670 100644 --- a/src/rpc/scanner/client.h +++ b/src/rpc/scanner/client.h @@ -26,7 +26,7 @@ // THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #pragma once -#include +#include #include #include #include @@ -81,7 +81,7 @@ namespace lws { namespace rpc { namespace scanner //! Send `users` upstream for disk storage static void send_update(const std::shared_ptr& self, std::vector users, std::vector blocks); - //! Closes socket and calls stop on `io_service`. + //! Closes socket and calls stop on `io_context`. void cleanup(); }; }}} // lws // rpc // scanner diff --git a/src/rpc/scanner/connection.cpp b/src/rpc/scanner/connection.cpp index 48a1f3b..904fcc2 100644 --- a/src/rpc/scanner/connection.cpp +++ b/src/rpc/scanner/connection.cpp @@ -31,7 +31,7 @@ namespace lws { namespace rpc { namespace scanner { - connection::connection(boost::asio::io_service& io) + connection::connection(boost::asio::io_context& io) : read_buf_(), write_bufs_(), sock_(io), diff --git a/src/rpc/scanner/connection.h b/src/rpc/scanner/connection.h index 379d50f..99ac27e 100644 --- a/src/rpc/scanner/connection.h +++ b/src/rpc/scanner/connection.h @@ -28,7 +28,7 @@ #include #include -#include +#include #include #include #include @@ -50,13 +50,15 @@ namespace lws { namespace rpc { namespace scanner std::deque write_bufs_; boost::asio::ip::tcp::socket sock_; boost::asio::steady_timer write_timeout_; - boost::asio::io_service::strand strand_; + boost::asio::io_context::strand strand_; header next_; bool cleanup_; - explicit connection(boost::asio::io_service& io); + explicit connection(boost::asio::io_context& io); ~connection(); + boost::asio::io_context& context() const { return strand_.context(); } + boost::asio::ip::tcp::endpoint remote_endpoint(); //! \return ASIO compatible read buffer of `size`. diff --git a/src/rpc/scanner/read_commands.h b/src/rpc/scanner/read_commands.h index 2a6acb0..ebc951d 100644 --- a/src/rpc/scanner/read_commands.h +++ b/src/rpc/scanner/read_commands.h @@ -26,7 +26,9 @@ // THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #pragma once +#include #include +#include #include #include #include @@ -120,11 +122,15 @@ namespace lws { namespace rpc { namespace scanner for (;;) // multiple commands { // indefinite read timeout (waiting for next command) - BOOST_ASIO_CORO_YIELD boost::asio::async_read(self_->sock_, self_->read_buffer(sizeof(self_->next_)), self_->strand_.wrap(*this)); + BOOST_ASIO_CORO_YIELD boost::asio::async_read( + self_->sock_, self_->read_buffer(sizeof(self_->next_)), boost::asio::bind_executor(self_->strand_, *this) + ); std::memcpy(std::addressof(self_->next_), self_->read_buf_.data(), sizeof(self_->next_)); static_assert(std::numeric_limits::max() <= std::numeric_limits::max()); - BOOST_ASIO_CORO_YIELD boost::asio::async_read(self_->sock_, self_->read_buffer(self_->next_.length.value()), self_->strand_.wrap(*this)); + BOOST_ASIO_CORO_YIELD boost::asio::async_read( + self_->sock_, self_->read_buffer(self_->next_.length.value()), boost::asio::bind_executor(self_->strand_, *this) + ); const auto& commands = T::commands(); if (commands.size() <= self_->next_.id || !commands[self_->next_.id](self_)) @@ -142,7 +148,7 @@ namespace lws { namespace rpc { namespace scanner { if (!self) return false; - self->strand_.dispatch(do_read_commands{self}); + boost::asio::dispatch(self->strand_, do_read_commands{self}); return true; } }}} // lws // rpc // scanner diff --git a/src/rpc/scanner/server.cpp b/src/rpc/scanner/server.cpp index 15888c1..2b6c6e2 100644 --- a/src/rpc/scanner/server.cpp +++ b/src/rpc/scanner/server.cpp @@ -27,7 +27,9 @@ #include "server.h" +#include #include +#include #include #include #include @@ -38,7 +40,6 @@ #include "common/expect.h" // monero/src/ #include "error.h" #include "misc_log_ex.h" // monero/contrib/epee/include -#include "net/net_utils_base.h" // monero/contrib/epee/include #include "rpc/scanner/commands.h" #include "rpc/scanner/connection.h" #include "rpc/scanner/read_commands.h" @@ -79,7 +80,7 @@ namespace lws { namespace rpc { namespace scanner std::size_t threads_; //!< Number of scan threads at remote process public: - explicit server_connection(std::shared_ptr parent, boost::asio::io_service& io) + explicit server_connection(std::shared_ptr parent, boost::asio::io_context& io) : connection(io), parent_(std::move(parent)), threads_(0) @@ -174,8 +175,10 @@ namespace lws { namespace rpc { namespace scanner { for (;;) { - next_ = std::make_shared(self_, GET_IO_SERVICE(self_->check_timer_)); - BOOST_ASIO_CORO_YIELD self_->acceptor_.async_accept(next_->sock_, self_->strand_.wrap(*this)); + next_ = std::make_shared(self_, self_->strand_.context()); + BOOST_ASIO_CORO_YIELD self_->acceptor_.async_accept( + next_->sock_, boost::asio::bind_executor(self_->strand_, *this) + ); MINFO("New connection to " << next_->remote_endpoint() << " / " << next_.get()); @@ -197,7 +200,7 @@ namespace lws { namespace rpc { namespace scanner assert(self_->strand_.running_in_this_thread()); self_->check_timer_.expires_from_now(account_poll_interval); - self_->check_timer_.async_wait(self_->strand_.wrap(*this)); + self_->check_timer_.async_wait(boost::asio::bind_executor(self_->strand_, *this)); std::size_t total_threads = self_->local_.size(); std::vector> remotes{}; @@ -423,7 +426,7 @@ namespace lws { namespace rpc { namespace scanner }; } - server::server(boost::asio::io_service& io, db::storage disk, rpc::client zclient, std::vector> local, std::vector active, ssl_verification_t webhook_verify) + server::server(boost::asio::io_context& io, db::storage disk, rpc::client zclient, std::vector> local, std::vector active, ssl_verification_t webhook_verify) : strand_(io), check_timer_(io), acceptor_(io), @@ -484,7 +487,9 @@ namespace lws { namespace rpc { namespace scanner return; auto endpoint = get_endpoint(address); - self->strand_.dispatch([self, endpoint = std::move(endpoint), pass = std::move(pass)] () + boost::asio::dispatch( + self->strand_, + [self, endpoint = std::move(endpoint), pass = std::move(pass)] () { self->acceptor_.close(); self->acceptor_.open(endpoint.protocol()); @@ -495,21 +500,22 @@ namespace lws { namespace rpc { namespace scanner self->compute_hash(self->pass_hashed_, pass); acceptor{std::move(self)}(); - }); + } + ); } void server::start_user_checking(const std::shared_ptr& self) { if (!self) MONERO_THROW(common_error::kInvalidArgument, "nullptr self"); - self->strand_.dispatch(check_users{self}); + boost::asio::dispatch(self->strand_, check_users{self}); } void server::replace_users(const std::shared_ptr& self) { if (!self) MONERO_THROW(common_error::kInvalidArgument, "nullptr self"); - self->strand_.dispatch([self] () { self->do_replace_users(); }); + boost::asio::dispatch(self->strand_, [self] () { self->do_replace_users(); }); } void server::store(const std::shared_ptr& self, std::vector users, std::vector blocks) @@ -518,11 +524,14 @@ namespace lws { namespace rpc { namespace scanner MONERO_THROW(common_error::kInvalidArgument, "nullptr self"); std::sort(users.begin(), users.end(), by_height{}); - self->strand_.dispatch([self, users = std::move(users), blocks = std::move(blocks)] () + boost::asio::dispatch( + self->strand_, + [self, users = std::move(users), blocks = std::move(blocks)] () { const lws::scanner_options opts{self->webhook_verify_, false, false}; if (!lws::user_data::store(self->disk_, self->zclient_, epee::to_span(blocks), epee::to_span(users), nullptr, opts)) - GET_IO_SERVICE(self->check_timer_).stop(); - }); + self->strand_.context().stop(); + } + ); } }}} // lws // rpc // scanner diff --git a/src/rpc/scanner/server.h b/src/rpc/scanner/server.h index 6be369c..2f14d31 100644 --- a/src/rpc/scanner/server.h +++ b/src/rpc/scanner/server.h @@ -27,7 +27,7 @@ #pragma once #include -#include +#include #include #include #include @@ -57,7 +57,7 @@ namespace lws { namespace rpc { namespace scanner needed (basically a REST server on either end). */ class server { - boost::asio::io_service::strand strand_; + boost::asio::io_context::strand strand_; boost::asio::steady_timer check_timer_; boost::asio::ip::tcp::acceptor acceptor_; std::set, std::owner_less>> remote_; @@ -82,7 +82,7 @@ namespace lws { namespace rpc { namespace scanner public: static boost::asio::ip::tcp::endpoint get_endpoint(const std::string& address); - explicit server(boost::asio::io_service& io, db::storage disk, rpc::client zclient, std::vector> local, std::vector active, ssl_verification_t webhook_verify); + explicit server(boost::asio::io_context& io, db::storage disk, rpc::client zclient, std::vector> local, std::vector active, ssl_verification_t webhook_verify); server(const server&) = delete; server(server&&) = delete; diff --git a/src/rpc/scanner/write_commands.h b/src/rpc/scanner/write_commands.h index d13384b..e996493 100644 --- a/src/rpc/scanner/write_commands.h +++ b/src/rpc/scanner/write_commands.h @@ -26,7 +26,9 @@ // THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #pragma once +#include #include +#include #include #include #include @@ -117,8 +119,10 @@ namespace lws { namespace rpc { namespace scanner while (!self_->write_bufs_.empty()) { self_->write_timeout_.expires_from_now(std::chrono::seconds{10}); - self_->write_timeout_.async_wait(self_->strand_.wrap(timeout{self_})); - BOOST_ASIO_CORO_YIELD boost::asio::async_write(self_->sock_, self_->write_buffer(), self_->strand_.wrap(*this)); + self_->write_timeout_.async_wait(boost::asio::bind_executor(self_->strand_, timeout{self_})); + BOOST_ASIO_CORO_YIELD boost::asio::async_write( + self_->sock_, self_->write_buffer(), boost::asio::bind_executor(self_->strand_, *this) + ); self_->write_timeout_.cancel(); self_->write_bufs_.pop_front(); } @@ -204,6 +208,6 @@ namespace lws { namespace rpc { namespace scanner } }; - self->strand_.dispatch(queue_slice{self, std::move(msg)}); + boost::asio::dispatch(self->strand_, queue_slice{self, std::move(msg)}); } }}} // lws // rpc // scanner diff --git a/src/scanner.cpp b/src/scanner.cpp index 066c4e4..b518856 100644 --- a/src/scanner.cpp +++ b/src/scanner.cpp @@ -1323,11 +1323,11 @@ namespace lws thread_count = std::max(std::size_t(1), thread_count); /*! \NOTE Be careful about references and lifetimes of the callbacks. The - ones below are safe because no `io_service::run()` call is after the + ones below are safe because no `io_context::run()` call is after the destruction of the references. \NOTE That `ctx` will need a strand or lock if multiple - `io_service::run()` calls are used. */ + `io_context::run()` calls are used. */ boost::asio::steady_timer rate_timer{sync_.io_}; class rate_updater diff --git a/src/scanner.h b/src/scanner.h index 45774fd..3e00f02 100644 --- a/src/scanner.h +++ b/src/scanner.h @@ -27,7 +27,7 @@ #pragma once #include -#include +#include #include #include #include @@ -77,7 +77,7 @@ namespace lws struct scanner_sync { - boost::asio::io_service io_; + boost::asio::io_context io_; std::atomic stop_; //!< Stop scanning but do not shutdown std::atomic shutdown_; //!< Exit scanner::run