From 7c6141811d7e8f52e56a616a787505b9c8f3cfef Mon Sep 17 00:00:00 2001 From: mfrankovi Date: Thu, 3 Oct 2024 14:44:50 +0200 Subject: [PATCH] chore: rpc requests limit --- libraries/config/include/config/network.hpp | 2 ++ libraries/config/src/network.cpp | 5 ++++ .../network/include/network/http_server.hpp | 9 +++++-- .../network/include/network/ws_server.hpp | 8 +++++- .../core_libs/network/src/http_server.cpp | 27 ++++++++++++++----- libraries/core_libs/network/src/ws_server.cpp | 16 ++++++++--- .../core_libs/node/include/node/node.hpp | 4 +-- libraries/core_libs/node/src/node.cpp | 24 ++++++++--------- 8 files changed, 68 insertions(+), 27 deletions(-) diff --git a/libraries/config/include/config/network.hpp b/libraries/config/include/config/network.hpp index d9f1c07c19..477a8155cd 100644 --- a/libraries/config/include/config/network.hpp +++ b/libraries/config/include/config/network.hpp @@ -22,6 +22,8 @@ struct ConnectionConfig { // Number of threads dedicated to the rpc calls processing, default = 5 uint16_t threads_num{5}; + uint32_t max_pending_tasks{100}; + void validate() const; }; diff --git a/libraries/config/src/network.cpp b/libraries/config/src/network.cpp index 98031b74bf..cb37200713 100644 --- a/libraries/config/src/network.cpp +++ b/libraries/config/src/network.cpp @@ -33,6 +33,11 @@ void dec_json(const Json::Value &json, ConnectionConfig &config) { config.ws_port = ws_port.asUInt(); } + // max pending tasks + if (auto max_pending_tasks = getConfigData(json, {"max_pending_tasks"}, true); !max_pending_tasks.isNull()) { + config.max_pending_tasks = max_pending_tasks.asUInt(); + } + // number of threads processing rpc calls if (auto threads_num = getConfigData(json, {"threads_num"}, true); !threads_num.isNull()) { config.threads_num = threads_num.asUInt(); diff --git a/libraries/core_libs/network/include/network/http_server.hpp b/libraries/core_libs/network/include/network/http_server.hpp index 615f0cb954..050f425131 100644 --- a/libraries/core_libs/network/include/network/http_server.hpp +++ b/libraries/core_libs/network/include/network/http_server.hpp @@ -4,6 +4,7 @@ #include #include +#include "common/thread_pool.hpp" #include "common/types.hpp" #include "logger/logger.hpp" @@ -22,8 +23,8 @@ class HttpHandler; class HttpServer : public std::enable_shared_from_this { public: - HttpServer(boost::asio::io_context& io, boost::asio::ip::tcp::endpoint ep, const addr_t& node_addr, - const std::shared_ptr& request_processor); + HttpServer(std::shared_ptr thread_pool, boost::asio::ip::tcp::endpoint ep, const addr_t& node_addr, + const std::shared_ptr& request_processor, uint32_t max_pending_tasks); virtual ~HttpServer() { HttpServer::stop(); } @@ -31,6 +32,8 @@ class HttpServer : public std::enable_shared_from_this { bool stop(); void accept(); + uint32_t numberOfPendingTasks() { return thread_pool_->num_pending_tasks(); } + bool pendingTasksOverLimit() { return thread_pool_->num_pending_tasks() > max_pending_tasks_; } boost::asio::io_context& getIoContext() { return io_context_; } std::shared_ptr getShared(); std::shared_ptr createConnection(); @@ -45,6 +48,8 @@ class HttpServer : public std::enable_shared_from_this { boost::asio::io_context& io_context_; boost::asio::ip::tcp::acceptor acceptor_; boost::asio::ip::tcp::endpoint ep_; + std::shared_ptr thread_pool_; + uint32_t max_pending_tasks_; LOG_OBJECTS_DEFINE }; // QQ: diff --git a/libraries/core_libs/network/include/network/ws_server.hpp b/libraries/core_libs/network/include/network/ws_server.hpp index e1290c8aef..506b2d58ec 100644 --- a/libraries/core_libs/network/include/network/ws_server.hpp +++ b/libraries/core_libs/network/include/network/ws_server.hpp @@ -16,6 +16,7 @@ #include #include +#include "common/thread_pool.hpp" #include "config/config.hpp" #include "dag/dag_block.hpp" #include "final_chain/data.hpp" @@ -84,7 +85,8 @@ class WsSession : public std::enable_shared_from_this { // Accepts incoming connections and launches the sessions class WsServer : public std::enable_shared_from_this, public jsonrpc::AbstractServerConnector { public: - WsServer(boost::asio::io_context& ioc, tcp::endpoint endpoint, addr_t node_addr); + WsServer(std::shared_ptr thread_pool, tcp::endpoint endpoint, addr_t node_addr, + uint32_t max_pending_tasks); virtual ~WsServer(); WsServer(const WsServer&) = delete; @@ -101,6 +103,8 @@ class WsServer : public std::enable_shared_from_this, public jsonrpc:: void newPendingTransaction(const trx_hash_t& trx_hash); void newPillarBlockData(const pillar_chain::PillarBlockData& pillar_block_data); uint32_t numberOfSessions(); + uint32_t numberOfPendingTasks() { return thread_pool_->num_pending_tasks(); } + bool pendingTasksOverLimit() { return thread_pool_->num_pending_tasks() > max_pending_tasks_; } virtual std::shared_ptr createSession(tcp::socket&& socket) = 0; @@ -118,6 +122,8 @@ class WsServer : public std::enable_shared_from_this, public jsonrpc:: boost::shared_mutex sessions_mtx_; protected: + std::shared_ptr thread_pool_; + uint32_t max_pending_tasks_; const addr_t node_addr_; }; diff --git a/libraries/core_libs/network/src/http_server.cpp b/libraries/core_libs/network/src/http_server.cpp index 1c82a131db..4a29cca056 100644 --- a/libraries/core_libs/network/src/http_server.cpp +++ b/libraries/core_libs/network/src/http_server.cpp @@ -2,9 +2,15 @@ namespace taraxa::net { -HttpServer::HttpServer(boost::asio::io_context &io, boost::asio::ip::tcp::endpoint ep, const addr_t &node_addr, - const std::shared_ptr &request_processor) - : request_processor_(request_processor), io_context_(io), acceptor_(io), ep_(std::move(ep)) { +HttpServer::HttpServer(std::shared_ptr thread_pool, boost::asio::ip::tcp::endpoint ep, + const addr_t &node_addr, const std::shared_ptr &request_processor, + uint32_t max_pending_tasks) + : request_processor_(request_processor), + io_context_(thread_pool->unsafe_get_io_context()), + acceptor_(thread_pool->unsafe_get_io_context()), + ep_(std::move(ep)), + thread_pool_(thread_pool), + max_pending_tasks_(max_pending_tasks) { LOG_OBJECTS_CREATE("HTTP"); LOG(log_si_) << "Taraxa HttpServer started at port: " << ep_.port(); } @@ -98,10 +104,17 @@ void HttpConnection::read() { } else { assert(server_->request_processor_); LOG(server_->log_dg_) << "Received: " << request_; - response_ = server_->request_processor_->process(request_); - boost::beast::http::async_write( - socket_, response_, - [this_sp = getShared()](auto const & /*ec*/, auto /*bytes_transferred*/) { this_sp->stop(); }); + + if (server_->pendingTasksOverLimit()) { + LOG(server_->log_er_) << "HttpConnection closed - pending tasks over the limit " + << server_->numberOfPendingTasks(); + stop(); + } else { + response_ = server_->request_processor_->process(request_); + boost::beast::http::async_write( + socket_, response_, + [this_sp = getShared()](auto const & /*ec*/, auto /*bytes_transferred*/) { this_sp->stop(); }); + } } }); } diff --git a/libraries/core_libs/network/src/ws_server.cpp b/libraries/core_libs/network/src/ws_server.cpp index cb991382f9..3a425c5b0a 100644 --- a/libraries/core_libs/network/src/ws_server.cpp +++ b/libraries/core_libs/network/src/ws_server.cpp @@ -49,8 +49,13 @@ void WsSession::on_read(beast::error_code ec, std::size_t bytes_transferred) { return close(is_normal(ec)); } - LOG(log_tr_) << "WS READ " << (static_cast(read_buffer_.data().data())); + auto ws_server = ws_server_.lock(); + if (ws_server && ws_server->pendingTasksOverLimit()) { + LOG(log_er_) << "WS closed - pending tasks over the limit " << ws_server->numberOfPendingTasks(); + return close(true); + } + LOG(log_tr_) << "WS READ " << (static_cast(read_buffer_.data().data())); processAsync(); // Do another read do_read(); @@ -202,8 +207,13 @@ bool WsSession::is_normal(const beast::error_code &ec) const { return false; } -WsServer::WsServer(boost::asio::io_context &ioc, tcp::endpoint endpoint, addr_t node_addr) - : ioc_(ioc), acceptor_(ioc), node_addr_(std::move(node_addr)) { +WsServer::WsServer(std::shared_ptr thread_pool, tcp::endpoint endpoint, addr_t node_addr, + uint32_t max_pending_tasks) + : ioc_(thread_pool->unsafe_get_io_context()), + acceptor_(thread_pool->unsafe_get_io_context()), + thread_pool_(thread_pool), + max_pending_tasks_(max_pending_tasks), + node_addr_(std::move(node_addr)) { LOG_OBJECTS_CREATE("WS_SERVER"); beast::error_code ec; diff --git a/libraries/core_libs/node/include/node/node.hpp b/libraries/core_libs/node/include/node/node.hpp index 77573b7d78..2a658ecd2f 100644 --- a/libraries/core_libs/node/include/node/node.hpp +++ b/libraries/core_libs/node/include/node/node.hpp @@ -80,8 +80,8 @@ class FullNode : public std::enable_shared_from_this { using JsonRpcServer = ModularServer; // should be destroyed after all components, since they may depend on it through unsafe pointers - std::unique_ptr rpc_thread_pool_; - std::unique_ptr graphql_thread_pool_; + std::shared_ptr rpc_thread_pool_; + std::shared_ptr graphql_thread_pool_; // In cae we will you config for this TP, it needs to be unique_ptr !!! util::ThreadPool subscription_pool_; diff --git a/libraries/core_libs/node/src/node.cpp b/libraries/core_libs/node/src/node.cpp index 0ee48d9c94..c05a4d4e5c 100644 --- a/libraries/core_libs/node/src/node.cpp +++ b/libraries/core_libs/node/src/node.cpp @@ -173,7 +173,7 @@ void FullNode::start() { // Inits rpc related members if (conf_.network.rpc) { - rpc_thread_pool_ = std::make_unique(conf_.network.rpc->threads_num); + rpc_thread_pool_ = std::make_shared(conf_.network.rpc->threads_num); net::rpc::eth::EthParams eth_rpc_params; eth_rpc_params.address = getAddress(); eth_rpc_params.chain_id = conf_.genesis.chain_id; @@ -226,16 +226,15 @@ void FullNode::start() { if (conf_.network.rpc->http_port) { auto json_rpc_processor = std::make_shared(); jsonrpc_http_ = std::make_shared( - rpc_thread_pool_->unsafe_get_io_context(), - boost::asio::ip::tcp::endpoint{conf_.network.rpc->address, *conf_.network.rpc->http_port}, getAddress(), - json_rpc_processor); + rpc_thread_pool_, boost::asio::ip::tcp::endpoint{conf_.network.rpc->address, *conf_.network.rpc->http_port}, + getAddress(), json_rpc_processor, conf_.network.rpc->max_pending_tasks); jsonrpc_api_->addConnector(json_rpc_processor); jsonrpc_http_->start(); } if (conf_.network.rpc->ws_port) { jsonrpc_ws_ = std::make_shared( - rpc_thread_pool_->unsafe_get_io_context(), - boost::asio::ip::tcp::endpoint{conf_.network.rpc->address, *conf_.network.rpc->ws_port}, getAddress()); + rpc_thread_pool_, boost::asio::ip::tcp::endpoint{conf_.network.rpc->address, *conf_.network.rpc->ws_port}, + getAddress(), conf_.network.rpc->max_pending_tasks); jsonrpc_api_->addConnector(jsonrpc_ws_); jsonrpc_ws_->run(); } @@ -280,22 +279,23 @@ void FullNode::start() { *rpc_thread_pool_); } if (conf_.network.graphql) { - graphql_thread_pool_ = std::make_unique(conf_.network.graphql->threads_num); + graphql_thread_pool_ = std::make_shared(conf_.network.graphql->threads_num); if (conf_.network.graphql->ws_port) { graphql_ws_ = std::make_shared( - graphql_thread_pool_->unsafe_get_io_context(), - boost::asio::ip::tcp::endpoint{conf_.network.graphql->address, *conf_.network.graphql->ws_port}, - getAddress()); + graphql_thread_pool_, + boost::asio::ip::tcp::endpoint{conf_.network.graphql->address, *conf_.network.graphql->ws_port}, getAddress(), + conf_.network.rpc->max_pending_tasks); // graphql_ws_->run(); } if (conf_.network.graphql->http_port) { graphql_http_ = std::make_shared( - graphql_thread_pool_->unsafe_get_io_context(), + graphql_thread_pool_, boost::asio::ip::tcp::endpoint{conf_.network.graphql->address, *conf_.network.graphql->http_port}, getAddress(), std::make_shared(final_chain_, dag_mgr_, pbft_mgr_, trx_mgr_, db_, gas_pricer_, - as_weak(network_), conf_.genesis.chain_id)); + as_weak(network_), conf_.genesis.chain_id), + conf_.network.rpc->max_pending_tasks); graphql_http_->start(); } }