From 55aec49c2ff76fa2144dd15b69edd5ca66437252 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Tue, 14 Jan 2020 11:55:50 +0800 Subject: [PATCH 1/2] refactor(network): refine codes and comments --- include/dsn/tool-api/network.h | 23 +++------- src/core/core/network.cpp | 49 ++++++++++----------- src/core/tools/common/asio_net_provider.cpp | 12 ++--- src/core/tools/common/asio_rpc_session.cpp | 10 ++--- src/core/tools/common/asio_rpc_session.h | 3 +- 5 files changed, 41 insertions(+), 56 deletions(-) diff --git a/include/dsn/tool-api/network.h b/include/dsn/tool-api/network.h index 3cc9d28bae..0b3cf6645e 100644 --- a/include/dsn/tool-api/network.h +++ b/include/dsn/tool-api/network.h @@ -24,15 +24,6 @@ * THE SOFTWARE. */ -/* - * Description: - * base interface for a network provider - * - * Revision history: - * Mar., 2015, @imzhenyu (Zhenyu Guo), first version - * xxxx-xx-xx, author, fix bug about xxx - */ - #pragma once #include @@ -166,11 +157,11 @@ class connection_oriented_network : public network DSN_API void on_server_session_accepted(rpc_session_ptr &s); DSN_API void on_server_session_disconnected(rpc_session_ptr &s); - // server connection count threshold - DSN_API bool is_conn_threshold_exceeded(::dsn::rpc_address ep); + // Checks if IP of the incoming session has too much connections. + // Related config: [network] conn_threshold_per_ip. No limit if the value is 0. + DSN_API bool check_if_conn_threshold_exceeded(::dsn::rpc_address ep); // client session management - DSN_API rpc_session_ptr get_client_session(::dsn::rpc_address ep); DSN_API void on_client_session_connected(rpc_session_ptr &s); DSN_API void on_client_session_disconnected(rpc_session_ptr &s); @@ -221,7 +212,9 @@ class rpc_session : public ref_counter virtual void connect() = 0; virtual void close() = 0; + // Whether this session is launched on client side. bool is_client() const { return _is_client; } + dsn::rpc_address remote_address() const { return _remote_addr; } connection_oriented_network &net() const { return _net; } message_parser_ptr parser() const { return _parser; } @@ -295,10 +288,6 @@ class rpc_session : public ref_counter bool set_disconnected(); void set_connected(); - bool is_disconnected() const { return _connect_state == SS_DISCONNECTED; } - bool is_connecting() const { return _connect_state == SS_CONNECTING; } - bool is_connected() const { return _connect_state == SS_CONNECTED; } - void clear_send_queue(bool resend_msgs); bool on_disconnected(bool is_write); @@ -330,4 +319,4 @@ inline bool rpc_session::delay_recv(int delay_ms) } /*@}*/ -} +} // namespace dsn diff --git a/src/core/core/network.cpp b/src/core/core/network.cpp index 832a959ba0..f2bbfbfa56 100644 --- a/src/core/core/network.cpp +++ b/src/core/core/network.cpp @@ -609,43 +609,46 @@ void connection_oriented_network::on_server_session_accepted(rpc_session_ptr &s) void connection_oriented_network::on_server_session_disconnected(rpc_session_ptr &s) { - int scount = 0; - int ecount = 0; - bool r = false; + int ip_count = 0; // how many unique client IPs + int ip_conn_count = 0; // how many connections bound to the IP of `s` + bool session_removed = false; { utils::auto_write_lock l(_servers_lock); auto it = _servers.find(s->remote_address()); if (it != _servers.end() && it->second.get() == s.get()) { _servers.erase(it); - r = true; + session_removed = true; } - scount = (int)_servers.size(); + ip_count = (int)_servers.size(); auto it2 = _ip_conn_count.find(s->remote_address().ip()); if (it2 != _ip_conn_count.end()) { if (it2->second > 1) { - ecount = --it2->second; + it2->second -= 1; + ip_conn_count = it2->second; } else { _ip_conn_count.erase(it2); } } } - if (r) { - ddebug("server session disconnected, remote_client = %s, current_count = %d", + if (session_removed) { + ddebug("session %s disconnected, the total client sessions count remains %d", s->remote_address().to_string(), - scount); + ip_count); } - if (ecount == 0) - ddebug("ip session erased, remote_client = %s", s->remote_address().to_string()); - else - ddebug("ip session decreased, remote_client = %s, current_count = %d", + if (ip_conn_count == 0) { + // TODO(wutao1): print ip only + ddebug("client ip %s has no more session to this server", s->remote_address().to_string()); + } else { + ddebug("client ip %s has still %d of sessions to this server", s->remote_address().to_string(), - ecount); + ip_conn_count); + } } -bool connection_oriented_network::is_conn_threshold_exceeded(::dsn::rpc_address ep) +bool connection_oriented_network::check_if_conn_threshold_exceeded(::dsn::rpc_address ep) { if (_cfg_conn_threshold_per_ip <= 0) { dinfo("new client from %s is connecting to server %s, no connection threshold", @@ -655,34 +658,28 @@ bool connection_oriented_network::is_conn_threshold_exceeded(::dsn::rpc_address } bool exceeded = false; - int scount = 0; + int ip_conn_count = 0; // the amount of connections from this ip address. { utils::auto_read_lock l(_servers_lock); auto it = _ip_conn_count.find(ep.ip()); if (it != _ip_conn_count.end()) { - scount = it->second; + ip_conn_count = it->second; } } - if (scount >= _cfg_conn_threshold_per_ip) + if (ip_conn_count >= _cfg_conn_threshold_per_ip) { exceeded = true; + } dinfo("new client from %s is connecting to server %s, existing connection count " "= %d, threshold = %u", ep.ipv4_str(), address().to_string(), - scount, + ip_conn_count, _cfg_conn_threshold_per_ip); return exceeded; } -rpc_session_ptr connection_oriented_network::get_client_session(::dsn::rpc_address ep) -{ - utils::auto_read_lock l(_clients_lock); - auto it = _clients.find(ep); - return it != _clients.end() ? it->second : nullptr; -} - void connection_oriented_network::on_client_session_connected(rpc_session_ptr &s) { int scount = 0; diff --git a/src/core/tools/common/asio_net_provider.cpp b/src/core/tools/common/asio_net_provider.cpp index e76d10c35e..98c10eb15a 100644 --- a/src/core/tools/common/asio_net_provider.cpp +++ b/src/core/tools/common/asio_net_provider.cpp @@ -25,6 +25,7 @@ */ #include +#include #include "asio_net_provider.h" #include "asio_rpc_session.h" @@ -63,7 +64,6 @@ error_code asio_network_provider::start(rpc_channel channel, int port, bool clie // get connection threshold from config, default value 0 means no threshold _cfg_conn_threshold_per_ip = (uint32_t)dsn_config_get_value_uint64( "network", "conn_threshold_per_ip", 0, "max connection count to each server per ip"); - ddebug("_cfg_conn_threshold_per_ip = %u", _cfg_conn_threshold_per_ip); for (int i = 0; i < io_service_worker_count; i++) { _workers.push_back(std::make_shared([this, i]() { @@ -126,16 +126,16 @@ error_code asio_network_provider::start(rpc_channel channel, int port, bool clie rpc_session_ptr asio_network_provider::create_client_session(::dsn::rpc_address server_addr) { - auto sock = std::shared_ptr( - new boost::asio::ip::tcp::socket(_io_service)); + auto sock = std::make_shared( + _io_service); message_parser_ptr parser(new_message_parser(_client_hdr_format)); return rpc_session_ptr(new asio_rpc_session(*this, server_addr, sock, parser, true)); } void asio_network_provider::do_accept() { - auto socket = std::shared_ptr( - new boost::asio::ip::tcp::socket(_io_service)); + auto socket = std::make_shared( + _io_service); _acceptor->async_accept(*socket, [this, socket](boost::system::error_code ec) { if (!ec) { @@ -156,7 +156,7 @@ void asio_network_provider::do_accept() false); // when server connection threshold is hit, close the session, otherwise accept it - if (is_conn_threshold_exceeded(s->remote_address())) { + if (check_if_conn_threshold_exceeded(s->remote_address())) { dwarn("close rpc connection from %s to %s due to hitting server " "connection threshold per ip", s->remote_address().to_string(), diff --git a/src/core/tools/common/asio_rpc_session.cpp b/src/core/tools/common/asio_rpc_session.cpp index 570d635ddc..f68187485c 100644 --- a/src/core/tools/common/asio_rpc_session.cpp +++ b/src/core/tools/common/asio_rpc_session.cpp @@ -29,8 +29,6 @@ namespace dsn { namespace tools { -asio_rpc_session::~asio_rpc_session() {} - void asio_rpc_session::set_options() { utils::auto_write_lock socket_guard(_socket_lock); @@ -68,9 +66,9 @@ void asio_rpc_session::set_options() // withheld, waiting for the ACK for the previous packet. For more, please // refer to . // - // Disabling the Nagle algorithm would cause these affacts: - // * decrease delay time (positive affact) - // * decrease the qps (negative affact) + // Disabling the Nagle algorithm would cause these effects: + // * decrease delay time (positive) + // * decrease the qps (negative) _socket->set_option(boost::asio::ip::tcp::no_delay(true), ec); if (ec) dwarn("asio socket set option failed, error = %s", ec.message().c_str()); @@ -147,7 +145,7 @@ void asio_rpc_session::send(uint64_t signature) utils::auto_read_lock socket_guard(_socket_lock); boost::asio::async_write( *_socket, asio_wbufs, [this, signature](boost::system::error_code ec, std::size_t length) { - if (!!ec) { + if (ec) { derror( "asio write to %s failed: %s", _remote_addr.to_string(), ec.message().c_str()); on_failure(true); diff --git a/src/core/tools/common/asio_rpc_session.h b/src/core/tools/common/asio_rpc_session.h index d3457f0a54..574eb23054 100644 --- a/src/core/tools/common/asio_rpc_session.h +++ b/src/core/tools/common/asio_rpc_session.h @@ -35,6 +35,7 @@ namespace dsn { namespace tools { +// A TCP session implementation based on Boost.Asio. // Thread-safe class asio_rpc_session : public rpc_session { @@ -45,7 +46,7 @@ class asio_rpc_session : public rpc_session message_parser_ptr &parser, bool is_client); - ~asio_rpc_session() override; + ~asio_rpc_session() override = default; void send(uint64_t signature) override; From 20c87f069af7e8be21974885f7a42b549c152297 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Tue, 14 Jan 2020 12:03:16 +0800 Subject: [PATCH 2/2] code foramt --- src/core/tools/common/asio_net_provider.cpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/core/tools/common/asio_net_provider.cpp b/src/core/tools/common/asio_net_provider.cpp index 98c10eb15a..6448028d35 100644 --- a/src/core/tools/common/asio_net_provider.cpp +++ b/src/core/tools/common/asio_net_provider.cpp @@ -126,16 +126,14 @@ error_code asio_network_provider::start(rpc_channel channel, int port, bool clie rpc_session_ptr asio_network_provider::create_client_session(::dsn::rpc_address server_addr) { - auto sock = std::make_shared( - _io_service); + auto sock = std::make_shared(_io_service); message_parser_ptr parser(new_message_parser(_client_hdr_format)); return rpc_session_ptr(new asio_rpc_session(*this, server_addr, sock, parser, true)); } void asio_network_provider::do_accept() { - auto socket = std::make_shared( - _io_service); + auto socket = std::make_shared(_io_service); _acceptor->async_accept(*socket, [this, socket](boost::system::error_code ec) { if (!ec) {