Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: rpc requests limit #2863

Merged
merged 1 commit into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions libraries/config/include/config/network.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ struct ConnectionConfig {
// Number of threads dedicated to the rpc calls processing, default = 5
uint16_t threads_num{5};

uint32_t max_pending_tasks{100};

void validate() const;
};

Expand Down
5 changes: 5 additions & 0 deletions libraries/config/src/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ void dec_json(const Json::Value &json, ConnectionConfig &config) {
config.ws_port = ws_port.asUInt();
}

// max pending tasks
if (auto max_pending_tasks = getConfigData(json, {"max_pending_tasks"}, true); !max_pending_tasks.isNull()) {
config.max_pending_tasks = max_pending_tasks.asUInt();
}

// number of threads processing rpc calls
if (auto threads_num = getConfigData(json, {"threads_num"}, true); !threads_num.isNull()) {
config.threads_num = threads_num.asUInt();
Expand Down
9 changes: 7 additions & 2 deletions libraries/core_libs/network/include/network/http_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <boost/asio.hpp>
#include <boost/beast.hpp>

#include "common/thread_pool.hpp"
#include "common/types.hpp"
#include "logger/logger.hpp"

Expand All @@ -22,15 +23,17 @@ class HttpHandler;

class HttpServer : public std::enable_shared_from_this<HttpServer> {
public:
HttpServer(boost::asio::io_context& io, boost::asio::ip::tcp::endpoint ep, const addr_t& node_addr,
const std::shared_ptr<HttpProcessor>& request_processor);
HttpServer(std::shared_ptr<util::ThreadPool> thread_pool, boost::asio::ip::tcp::endpoint ep, const addr_t& node_addr,
const std::shared_ptr<HttpProcessor>& request_processor, uint32_t max_pending_tasks);

virtual ~HttpServer() { HttpServer::stop(); }

bool start();
bool stop();

void accept();
uint32_t numberOfPendingTasks() const;
bool pendingTasksOverLimit() const { return numberOfPendingTasks() > kMaxPendingTasks; }
boost::asio::io_context& getIoContext() { return io_context_; }
std::shared_ptr<HttpServer> getShared();
std::shared_ptr<HttpConnection> createConnection();
Expand All @@ -45,6 +48,8 @@ class HttpServer : public std::enable_shared_from_this<HttpServer> {
boost::asio::io_context& io_context_;
boost::asio::ip::tcp::acceptor acceptor_;
boost::asio::ip::tcp::endpoint ep_;
std::weak_ptr<util::ThreadPool> thread_pool_;
const uint32_t kMaxPendingTasks;
LOG_OBJECTS_DEFINE
};
// QQ:
Expand Down
8 changes: 7 additions & 1 deletion libraries/core_libs/network/include/network/ws_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <thread>
#include <vector>

#include "common/thread_pool.hpp"
#include "config/config.hpp"
#include "dag/dag_block.hpp"
#include "final_chain/data.hpp"
Expand Down Expand Up @@ -84,7 +85,8 @@ class WsSession : public std::enable_shared_from_this<WsSession> {
// Accepts incoming connections and launches the sessions
class WsServer : public std::enable_shared_from_this<WsServer>, public jsonrpc::AbstractServerConnector {
public:
WsServer(boost::asio::io_context& ioc, tcp::endpoint endpoint, addr_t node_addr);
WsServer(std::shared_ptr<util::ThreadPool> thread_pool, tcp::endpoint endpoint, addr_t node_addr,
uint32_t max_pending_tasks);
virtual ~WsServer();

WsServer(const WsServer&) = delete;
Expand All @@ -101,6 +103,8 @@ class WsServer : public std::enable_shared_from_this<WsServer>, public jsonrpc::
void newPendingTransaction(const trx_hash_t& trx_hash);
void newPillarBlockData(const pillar_chain::PillarBlockData& pillar_block_data);
uint32_t numberOfSessions();
uint32_t numberOfPendingTasks() const;
bool pendingTasksOverLimit() const { return numberOfPendingTasks() > kMaxPendingTasks; }

virtual std::shared_ptr<WsSession> createSession(tcp::socket&& socket) = 0;

Expand All @@ -118,6 +122,8 @@ class WsServer : public std::enable_shared_from_this<WsServer>, public jsonrpc::
boost::shared_mutex sessions_mtx_;

protected:
std::weak_ptr<util::ThreadPool> thread_pool_;
uint32_t kMaxPendingTasks;
const addr_t node_addr_;
};

Expand Down
35 changes: 28 additions & 7 deletions libraries/core_libs/network/src/http_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@

namespace taraxa::net {

HttpServer::HttpServer(boost::asio::io_context &io, boost::asio::ip::tcp::endpoint ep, const addr_t &node_addr,
const std::shared_ptr<HttpProcessor> &request_processor)
: request_processor_(request_processor), io_context_(io), acceptor_(io), ep_(std::move(ep)) {
HttpServer::HttpServer(std::shared_ptr<util::ThreadPool> thread_pool, boost::asio::ip::tcp::endpoint ep,
const addr_t &node_addr, const std::shared_ptr<HttpProcessor> &request_processor,
uint32_t max_pending_tasks)
: request_processor_(request_processor),
io_context_(thread_pool->unsafe_get_io_context()),
acceptor_(thread_pool->unsafe_get_io_context()),
ep_(std::move(ep)),
thread_pool_(thread_pool),
kMaxPendingTasks(max_pending_tasks) {
LOG_OBJECTS_CREATE("HTTP");
LOG(log_si_) << "Taraxa HttpServer started at port: " << ep_.port();
}
Expand Down Expand Up @@ -66,6 +72,14 @@ bool HttpServer::stop() {
return true;
}

uint32_t HttpServer::numberOfPendingTasks() const {
auto thread_pool = thread_pool_.lock();
if (thread_pool) {
return thread_pool->num_pending_tasks();
}
return 0;
}

std::shared_ptr<HttpConnection> HttpConnection::getShared() {
try {
return shared_from_this();
Expand Down Expand Up @@ -98,10 +112,17 @@ void HttpConnection::read() {
} else {
assert(server_->request_processor_);
LOG(server_->log_dg_) << "Received: " << request_;
response_ = server_->request_processor_->process(request_);
boost::beast::http::async_write(
socket_, response_,
[this_sp = getShared()](auto const & /*ec*/, auto /*bytes_transferred*/) { this_sp->stop(); });

if (server_->pendingTasksOverLimit()) {
LOG(server_->log_er_) << "HttpConnection closed - pending tasks over the limit "
<< server_->numberOfPendingTasks();
stop();
} else {
response_ = server_->request_processor_->process(request_);
boost::beast::http::async_write(
socket_, response_,
[this_sp = getShared()](auto const & /*ec*/, auto /*bytes_transferred*/) { this_sp->stop(); });
}
}
});
}
Expand Down
24 changes: 21 additions & 3 deletions libraries/core_libs/network/src/ws_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,13 @@ void WsSession::on_read(beast::error_code ec, std::size_t bytes_transferred) {
return close(is_normal(ec));
}

LOG(log_tr_) << "WS READ " << (static_cast<char *>(read_buffer_.data().data()));
auto ws_server = ws_server_.lock();
if (ws_server && ws_server->pendingTasksOverLimit()) {
LOG(log_er_) << "WS closed - pending tasks over the limit " << ws_server->numberOfPendingTasks();
return close(true);
}

LOG(log_tr_) << "WS READ " << (static_cast<char *>(read_buffer_.data().data()));
processAsync();
// Do another read
do_read();
Expand Down Expand Up @@ -202,8 +207,13 @@ bool WsSession::is_normal(const beast::error_code &ec) const {
return false;
}

WsServer::WsServer(boost::asio::io_context &ioc, tcp::endpoint endpoint, addr_t node_addr)
: ioc_(ioc), acceptor_(ioc), node_addr_(std::move(node_addr)) {
WsServer::WsServer(std::shared_ptr<util::ThreadPool> thread_pool, tcp::endpoint endpoint, addr_t node_addr,
uint32_t max_pending_tasks)
: ioc_(thread_pool->unsafe_get_io_context()),
acceptor_(thread_pool->unsafe_get_io_context()),
thread_pool_(thread_pool),
kMaxPendingTasks(max_pending_tasks),
node_addr_(std::move(node_addr)) {
LOG_OBJECTS_CREATE("WS_SERVER");
beast::error_code ec;

Expand Down Expand Up @@ -331,4 +341,12 @@ uint32_t WsServer::numberOfSessions() {
return sessions.size();
}

uint32_t WsServer::numberOfPendingTasks() const {
auto thread_pool = thread_pool_.lock();
if (thread_pool) {
return thread_pool->num_pending_tasks();
}
return 0;
}

} // namespace taraxa::net
4 changes: 2 additions & 2 deletions libraries/core_libs/node/include/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ class FullNode : public std::enable_shared_from_this<FullNode> {
using JsonRpcServer = ModularServer<net::TaraxaFace, net::NetFace, net::EthFace, net::TestFace, net::DebugFace>;

// should be destroyed after all components, since they may depend on it through unsafe pointers
std::unique_ptr<util::ThreadPool> rpc_thread_pool_;
std::unique_ptr<util::ThreadPool> graphql_thread_pool_;
std::shared_ptr<util::ThreadPool> rpc_thread_pool_;
std::shared_ptr<util::ThreadPool> graphql_thread_pool_;

// In cae we will you config for this TP, it needs to be unique_ptr !!!
util::ThreadPool subscription_pool_;
Expand Down
24 changes: 12 additions & 12 deletions libraries/core_libs/node/src/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ void FullNode::start() {

// Inits rpc related members
if (conf_.network.rpc) {
rpc_thread_pool_ = std::make_unique<util::ThreadPool>(conf_.network.rpc->threads_num);
rpc_thread_pool_ = std::make_shared<util::ThreadPool>(conf_.network.rpc->threads_num);
net::rpc::eth::EthParams eth_rpc_params;
eth_rpc_params.address = getAddress();
eth_rpc_params.chain_id = conf_.genesis.chain_id;
Expand Down Expand Up @@ -226,16 +226,15 @@ void FullNode::start() {
if (conf_.network.rpc->http_port) {
auto json_rpc_processor = std::make_shared<net::JsonRpcHttpProcessor>();
jsonrpc_http_ = std::make_shared<net::HttpServer>(
rpc_thread_pool_->unsafe_get_io_context(),
boost::asio::ip::tcp::endpoint{conf_.network.rpc->address, *conf_.network.rpc->http_port}, getAddress(),
json_rpc_processor);
rpc_thread_pool_, boost::asio::ip::tcp::endpoint{conf_.network.rpc->address, *conf_.network.rpc->http_port},
getAddress(), json_rpc_processor, conf_.network.rpc->max_pending_tasks);
jsonrpc_api_->addConnector(json_rpc_processor);
jsonrpc_http_->start();
}
if (conf_.network.rpc->ws_port) {
jsonrpc_ws_ = std::make_shared<net::JsonRpcWsServer>(
rpc_thread_pool_->unsafe_get_io_context(),
boost::asio::ip::tcp::endpoint{conf_.network.rpc->address, *conf_.network.rpc->ws_port}, getAddress());
rpc_thread_pool_, boost::asio::ip::tcp::endpoint{conf_.network.rpc->address, *conf_.network.rpc->ws_port},
getAddress(), conf_.network.rpc->max_pending_tasks);
jsonrpc_api_->addConnector(jsonrpc_ws_);
jsonrpc_ws_->run();
}
Expand Down Expand Up @@ -280,22 +279,23 @@ void FullNode::start() {
*rpc_thread_pool_);
}
if (conf_.network.graphql) {
graphql_thread_pool_ = std::make_unique<util::ThreadPool>(conf_.network.graphql->threads_num);
graphql_thread_pool_ = std::make_shared<util::ThreadPool>(conf_.network.graphql->threads_num);
if (conf_.network.graphql->ws_port) {
graphql_ws_ = std::make_shared<net::GraphQlWsServer>(
graphql_thread_pool_->unsafe_get_io_context(),
boost::asio::ip::tcp::endpoint{conf_.network.graphql->address, *conf_.network.graphql->ws_port},
getAddress());
graphql_thread_pool_,
boost::asio::ip::tcp::endpoint{conf_.network.graphql->address, *conf_.network.graphql->ws_port}, getAddress(),
conf_.network.rpc->max_pending_tasks);
// graphql_ws_->run();
}

if (conf_.network.graphql->http_port) {
graphql_http_ = std::make_shared<net::HttpServer>(
graphql_thread_pool_->unsafe_get_io_context(),
graphql_thread_pool_,
boost::asio::ip::tcp::endpoint{conf_.network.graphql->address, *conf_.network.graphql->http_port},
getAddress(),
std::make_shared<net::GraphQlHttpProcessor>(final_chain_, dag_mgr_, pbft_mgr_, trx_mgr_, db_, gas_pricer_,
as_weak(network_), conf_.genesis.chain_id));
as_weak(network_), conf_.genesis.chain_id),
conf_.network.rpc->max_pending_tasks);
graphql_http_->start();
}
}
Expand Down
Loading