Skip to content

Commit

Permalink
chore: rpc requests limit
Browse files Browse the repository at this point in the history
  • Loading branch information
mfrankovi committed Oct 7, 2024
1 parent f3beac2 commit b5d2c1c
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 27 deletions.
2 changes: 2 additions & 0 deletions libraries/config/include/config/network.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ struct ConnectionConfig {
// Number of threads dedicated to the rpc calls processing, default = 5
uint16_t threads_num{5};

uint32_t max_pending_tasks{100};

void validate() const;
};

Expand Down
5 changes: 5 additions & 0 deletions libraries/config/src/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ void dec_json(const Json::Value &json, ConnectionConfig &config) {
config.ws_port = ws_port.asUInt();
}

// max pending tasks
if (auto max_pending_tasks = getConfigData(json, {"max_pending_tasks"}, true); !max_pending_tasks.isNull()) {
config.max_pending_tasks = max_pending_tasks.asUInt();
}

// number of threads processing rpc calls
if (auto threads_num = getConfigData(json, {"threads_num"}, true); !threads_num.isNull()) {
config.threads_num = threads_num.asUInt();
Expand Down
9 changes: 7 additions & 2 deletions libraries/core_libs/network/include/network/http_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <boost/asio.hpp>
#include <boost/beast.hpp>

#include "common/thread_pool.hpp"
#include "common/types.hpp"
#include "logger/logger.hpp"

Expand All @@ -22,15 +23,17 @@ class HttpHandler;

class HttpServer : public std::enable_shared_from_this<HttpServer> {
public:
HttpServer(boost::asio::io_context& io, boost::asio::ip::tcp::endpoint ep, const addr_t& node_addr,
const std::shared_ptr<HttpProcessor>& request_processor);
HttpServer(std::shared_ptr<util::ThreadPool> thread_pool, boost::asio::ip::tcp::endpoint ep, const addr_t& node_addr,
const std::shared_ptr<HttpProcessor>& request_processor, uint32_t max_pending_tasks);

virtual ~HttpServer() { HttpServer::stop(); }

bool start();
bool stop();

void accept();
uint32_t numberOfPendingTasks() const;
bool pendingTasksOverLimit() const { return numberOfPendingTasks() > max_pending_tasks_; }
boost::asio::io_context& getIoContext() { return io_context_; }
std::shared_ptr<HttpServer> getShared();
std::shared_ptr<HttpConnection> createConnection();
Expand All @@ -45,6 +48,8 @@ class HttpServer : public std::enable_shared_from_this<HttpServer> {
boost::asio::io_context& io_context_;
boost::asio::ip::tcp::acceptor acceptor_;
boost::asio::ip::tcp::endpoint ep_;
std::weak_ptr<util::ThreadPool> thread_pool_;
uint32_t max_pending_tasks_;
LOG_OBJECTS_DEFINE
};
// QQ:
Expand Down
8 changes: 7 additions & 1 deletion libraries/core_libs/network/include/network/ws_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <thread>
#include <vector>

#include "common/thread_pool.hpp"
#include "config/config.hpp"
#include "dag/dag_block.hpp"
#include "final_chain/data.hpp"
Expand Down Expand Up @@ -84,7 +85,8 @@ class WsSession : public std::enable_shared_from_this<WsSession> {
// Accepts incoming connections and launches the sessions
class WsServer : public std::enable_shared_from_this<WsServer>, public jsonrpc::AbstractServerConnector {
public:
WsServer(boost::asio::io_context& ioc, tcp::endpoint endpoint, addr_t node_addr);
WsServer(std::shared_ptr<util::ThreadPool> thread_pool, tcp::endpoint endpoint, addr_t node_addr,
uint32_t max_pending_tasks);
virtual ~WsServer();

WsServer(const WsServer&) = delete;
Expand All @@ -101,6 +103,8 @@ class WsServer : public std::enable_shared_from_this<WsServer>, public jsonrpc::
void newPendingTransaction(const trx_hash_t& trx_hash);
void newPillarBlockData(const pillar_chain::PillarBlockData& pillar_block_data);
uint32_t numberOfSessions();
uint32_t numberOfPendingTasks() const;
bool pendingTasksOverLimit() const { return numberOfPendingTasks() > max_pending_tasks_; }

virtual std::shared_ptr<WsSession> createSession(tcp::socket&& socket) = 0;

Expand All @@ -118,6 +122,8 @@ class WsServer : public std::enable_shared_from_this<WsServer>, public jsonrpc::
boost::shared_mutex sessions_mtx_;

protected:
std::weak_ptr<util::ThreadPool> thread_pool_;
uint32_t max_pending_tasks_;
const addr_t node_addr_;
};

Expand Down
35 changes: 28 additions & 7 deletions libraries/core_libs/network/src/http_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@

namespace taraxa::net {

HttpServer::HttpServer(boost::asio::io_context &io, boost::asio::ip::tcp::endpoint ep, const addr_t &node_addr,
const std::shared_ptr<HttpProcessor> &request_processor)
: request_processor_(request_processor), io_context_(io), acceptor_(io), ep_(std::move(ep)) {
HttpServer::HttpServer(std::shared_ptr<util::ThreadPool> thread_pool, boost::asio::ip::tcp::endpoint ep,
const addr_t &node_addr, const std::shared_ptr<HttpProcessor> &request_processor,
uint32_t max_pending_tasks)
: request_processor_(request_processor),
io_context_(thread_pool->unsafe_get_io_context()),
acceptor_(thread_pool->unsafe_get_io_context()),
ep_(std::move(ep)),
thread_pool_(thread_pool),
max_pending_tasks_(max_pending_tasks) {
LOG_OBJECTS_CREATE("HTTP");
LOG(log_si_) << "Taraxa HttpServer started at port: " << ep_.port();
}
Expand Down Expand Up @@ -66,6 +72,14 @@ bool HttpServer::stop() {
return true;
}

uint32_t HttpServer::numberOfPendingTasks() const {
auto thread_pool = thread_pool_.lock();
if (thread_pool) {
return thread_pool->num_pending_tasks();
}
return 0;
}

std::shared_ptr<HttpConnection> HttpConnection::getShared() {
try {
return shared_from_this();
Expand Down Expand Up @@ -98,10 +112,17 @@ void HttpConnection::read() {
} else {
assert(server_->request_processor_);
LOG(server_->log_dg_) << "Received: " << request_;
response_ = server_->request_processor_->process(request_);
boost::beast::http::async_write(
socket_, response_,
[this_sp = getShared()](auto const & /*ec*/, auto /*bytes_transferred*/) { this_sp->stop(); });

if (server_->pendingTasksOverLimit()) {
LOG(server_->log_er_) << "HttpConnection closed - pending tasks over the limit "
<< server_->numberOfPendingTasks();
stop();
} else {
response_ = server_->request_processor_->process(request_);
boost::beast::http::async_write(
socket_, response_,
[this_sp = getShared()](auto const & /*ec*/, auto /*bytes_transferred*/) { this_sp->stop(); });
}
}
});
}
Expand Down
24 changes: 21 additions & 3 deletions libraries/core_libs/network/src/ws_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,13 @@ void WsSession::on_read(beast::error_code ec, std::size_t bytes_transferred) {
return close(is_normal(ec));
}

LOG(log_tr_) << "WS READ " << (static_cast<char *>(read_buffer_.data().data()));
auto ws_server = ws_server_.lock();
if (ws_server && ws_server->pendingTasksOverLimit()) {
LOG(log_er_) << "WS closed - pending tasks over the limit " << ws_server->numberOfPendingTasks();
return close(true);
}

LOG(log_tr_) << "WS READ " << (static_cast<char *>(read_buffer_.data().data()));
processAsync();
// Do another read
do_read();
Expand Down Expand Up @@ -202,8 +207,13 @@ bool WsSession::is_normal(const beast::error_code &ec) const {
return false;
}

WsServer::WsServer(boost::asio::io_context &ioc, tcp::endpoint endpoint, addr_t node_addr)
: ioc_(ioc), acceptor_(ioc), node_addr_(std::move(node_addr)) {
WsServer::WsServer(std::shared_ptr<util::ThreadPool> thread_pool, tcp::endpoint endpoint, addr_t node_addr,
uint32_t max_pending_tasks)
: ioc_(thread_pool->unsafe_get_io_context()),
acceptor_(thread_pool->unsafe_get_io_context()),
thread_pool_(thread_pool),
max_pending_tasks_(max_pending_tasks),
node_addr_(std::move(node_addr)) {
LOG_OBJECTS_CREATE("WS_SERVER");
beast::error_code ec;

Expand Down Expand Up @@ -331,4 +341,12 @@ uint32_t WsServer::numberOfSessions() {
return sessions.size();
}

uint32_t WsServer::numberOfPendingTasks() const {
auto thread_pool = thread_pool_.lock();
if (thread_pool) {
return thread_pool->num_pending_tasks();
}
return 0;
}

} // namespace taraxa::net
4 changes: 2 additions & 2 deletions libraries/core_libs/node/include/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ class FullNode : public std::enable_shared_from_this<FullNode> {
using JsonRpcServer = ModularServer<net::TaraxaFace, net::NetFace, net::EthFace, net::TestFace, net::DebugFace>;

// should be destroyed after all components, since they may depend on it through unsafe pointers
std::unique_ptr<util::ThreadPool> rpc_thread_pool_;
std::unique_ptr<util::ThreadPool> graphql_thread_pool_;
std::shared_ptr<util::ThreadPool> rpc_thread_pool_;
std::shared_ptr<util::ThreadPool> graphql_thread_pool_;

// In cae we will you config for this TP, it needs to be unique_ptr !!!
util::ThreadPool subscription_pool_;
Expand Down
24 changes: 12 additions & 12 deletions libraries/core_libs/node/src/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ void FullNode::start() {

// Inits rpc related members
if (conf_.network.rpc) {
rpc_thread_pool_ = std::make_unique<util::ThreadPool>(conf_.network.rpc->threads_num);
rpc_thread_pool_ = std::make_shared<util::ThreadPool>(conf_.network.rpc->threads_num);
net::rpc::eth::EthParams eth_rpc_params;
eth_rpc_params.address = getAddress();
eth_rpc_params.chain_id = conf_.genesis.chain_id;
Expand Down Expand Up @@ -226,16 +226,15 @@ void FullNode::start() {
if (conf_.network.rpc->http_port) {
auto json_rpc_processor = std::make_shared<net::JsonRpcHttpProcessor>();
jsonrpc_http_ = std::make_shared<net::HttpServer>(
rpc_thread_pool_->unsafe_get_io_context(),
boost::asio::ip::tcp::endpoint{conf_.network.rpc->address, *conf_.network.rpc->http_port}, getAddress(),
json_rpc_processor);
rpc_thread_pool_, boost::asio::ip::tcp::endpoint{conf_.network.rpc->address, *conf_.network.rpc->http_port},
getAddress(), json_rpc_processor, conf_.network.rpc->max_pending_tasks);
jsonrpc_api_->addConnector(json_rpc_processor);
jsonrpc_http_->start();
}
if (conf_.network.rpc->ws_port) {
jsonrpc_ws_ = std::make_shared<net::JsonRpcWsServer>(
rpc_thread_pool_->unsafe_get_io_context(),
boost::asio::ip::tcp::endpoint{conf_.network.rpc->address, *conf_.network.rpc->ws_port}, getAddress());
rpc_thread_pool_, boost::asio::ip::tcp::endpoint{conf_.network.rpc->address, *conf_.network.rpc->ws_port},
getAddress(), conf_.network.rpc->max_pending_tasks);
jsonrpc_api_->addConnector(jsonrpc_ws_);
jsonrpc_ws_->run();
}
Expand Down Expand Up @@ -280,22 +279,23 @@ void FullNode::start() {
*rpc_thread_pool_);
}
if (conf_.network.graphql) {
graphql_thread_pool_ = std::make_unique<util::ThreadPool>(conf_.network.graphql->threads_num);
graphql_thread_pool_ = std::make_shared<util::ThreadPool>(conf_.network.graphql->threads_num);
if (conf_.network.graphql->ws_port) {
graphql_ws_ = std::make_shared<net::GraphQlWsServer>(
graphql_thread_pool_->unsafe_get_io_context(),
boost::asio::ip::tcp::endpoint{conf_.network.graphql->address, *conf_.network.graphql->ws_port},
getAddress());
graphql_thread_pool_,
boost::asio::ip::tcp::endpoint{conf_.network.graphql->address, *conf_.network.graphql->ws_port}, getAddress(),
conf_.network.rpc->max_pending_tasks);
// graphql_ws_->run();
}

if (conf_.network.graphql->http_port) {
graphql_http_ = std::make_shared<net::HttpServer>(
graphql_thread_pool_->unsafe_get_io_context(),
graphql_thread_pool_,
boost::asio::ip::tcp::endpoint{conf_.network.graphql->address, *conf_.network.graphql->http_port},
getAddress(),
std::make_shared<net::GraphQlHttpProcessor>(final_chain_, dag_mgr_, pbft_mgr_, trx_mgr_, db_, gas_pricer_,
as_weak(network_), conf_.genesis.chain_id));
as_weak(network_), conf_.genesis.chain_id),
conf_.network.rpc->max_pending_tasks);
graphql_http_->start();
}
}
Expand Down

0 comments on commit b5d2c1c

Please sign in to comment.