Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

refactor(network): refine codes and comments #380

Merged
merged 4 commits into from
Jan 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 6 additions & 17 deletions include/dsn/tool-api/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,6 @@
* THE SOFTWARE.
*/

/*
* Description:
* base interface for a network provider
*
* Revision history:
* Mar., 2015, @imzhenyu (Zhenyu Guo), first version
* xxxx-xx-xx, author, fix bug about xxx
*/

#pragma once

#include <dsn/tool-api/task.h>
Expand Down Expand Up @@ -166,11 +157,11 @@ class connection_oriented_network : public network
DSN_API void on_server_session_accepted(rpc_session_ptr &s);
DSN_API void on_server_session_disconnected(rpc_session_ptr &s);

// server connection count threshold
DSN_API bool is_conn_threshold_exceeded(::dsn::rpc_address ep);
// Checks if IP of the incoming session has too much connections.
// Related config: [network] conn_threshold_per_ip. No limit if the value is 0.
DSN_API bool check_if_conn_threshold_exceeded(::dsn::rpc_address ep);

// client session management
DSN_API rpc_session_ptr get_client_session(::dsn::rpc_address ep);
DSN_API void on_client_session_connected(rpc_session_ptr &s);
DSN_API void on_client_session_disconnected(rpc_session_ptr &s);

Expand Down Expand Up @@ -221,7 +212,9 @@ class rpc_session : public ref_counter
virtual void connect() = 0;
virtual void close() = 0;

// Whether this session is launched on client side.
bool is_client() const { return _is_client; }

dsn::rpc_address remote_address() const { return _remote_addr; }
connection_oriented_network &net() const { return _net; }
message_parser_ptr parser() const { return _parser; }
Expand Down Expand Up @@ -295,10 +288,6 @@ class rpc_session : public ref_counter
bool set_disconnected();
void set_connected();

bool is_disconnected() const { return _connect_state == SS_DISCONNECTED; }
bool is_connecting() const { return _connect_state == SS_CONNECTING; }
bool is_connected() const { return _connect_state == SS_CONNECTED; }

void clear_send_queue(bool resend_msgs);
bool on_disconnected(bool is_write);

Expand Down Expand Up @@ -330,4 +319,4 @@ inline bool rpc_session::delay_recv(int delay_ms)
}

/*@}*/
}
} // namespace dsn
49 changes: 23 additions & 26 deletions src/core/core/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -609,43 +609,46 @@ void connection_oriented_network::on_server_session_accepted(rpc_session_ptr &s)

void connection_oriented_network::on_server_session_disconnected(rpc_session_ptr &s)
{
int scount = 0;
int ecount = 0;
bool r = false;
int ip_count = 0; // how many unique client IPs
int ip_conn_count = 0; // how many connections bound to the IP of `s`
bool session_removed = false;
{
utils::auto_write_lock l(_servers_lock);
auto it = _servers.find(s->remote_address());
if (it != _servers.end() && it->second.get() == s.get()) {
_servers.erase(it);
r = true;
session_removed = true;
}
scount = (int)_servers.size();
ip_count = (int)_servers.size();

auto it2 = _ip_conn_count.find(s->remote_address().ip());
if (it2 != _ip_conn_count.end()) {
if (it2->second > 1) {
ecount = --it2->second;
it2->second -= 1;
ip_conn_count = it2->second;
} else {
_ip_conn_count.erase(it2);
}
}
}

if (r) {
ddebug("server session disconnected, remote_client = %s, current_count = %d",
if (session_removed) {
ddebug("session %s disconnected, the total client sessions count remains %d",
s->remote_address().to_string(),
scount);
ip_count);
}

if (ecount == 0)
ddebug("ip session erased, remote_client = %s", s->remote_address().to_string());
else
ddebug("ip session decreased, remote_client = %s, current_count = %d",
if (ip_conn_count == 0) {
// TODO(wutao1): print ip only
ddebug("client ip %s has no more session to this server", s->remote_address().to_string());
} else {
ddebug("client ip %s has still %d of sessions to this server",
s->remote_address().to_string(),
ecount);
ip_conn_count);
}
}

bool connection_oriented_network::is_conn_threshold_exceeded(::dsn::rpc_address ep)
bool connection_oriented_network::check_if_conn_threshold_exceeded(::dsn::rpc_address ep)
{
if (_cfg_conn_threshold_per_ip <= 0) {
dinfo("new client from %s is connecting to server %s, no connection threshold",
Expand All @@ -655,34 +658,28 @@ bool connection_oriented_network::is_conn_threshold_exceeded(::dsn::rpc_address
}

bool exceeded = false;
int scount = 0;
int ip_conn_count = 0; // the amount of connections from this ip address.
{
utils::auto_read_lock l(_servers_lock);
auto it = _ip_conn_count.find(ep.ip());
if (it != _ip_conn_count.end()) {
scount = it->second;
ip_conn_count = it->second;
}
}
if (scount >= _cfg_conn_threshold_per_ip)
if (ip_conn_count >= _cfg_conn_threshold_per_ip) {
exceeded = true;
}

dinfo("new client from %s is connecting to server %s, existing connection count "
"= %d, threshold = %u",
ep.ipv4_str(),
address().to_string(),
scount,
ip_conn_count,
_cfg_conn_threshold_per_ip);

return exceeded;
}

rpc_session_ptr connection_oriented_network::get_client_session(::dsn::rpc_address ep)
{
utils::auto_read_lock l(_clients_lock);
auto it = _clients.find(ep);
return it != _clients.end() ? it->second : nullptr;
}

void connection_oriented_network::on_client_session_connected(rpc_session_ptr &s)
{
int scount = 0;
Expand Down
10 changes: 4 additions & 6 deletions src/core/tools/common/asio_net_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
*/

#include <dsn/utility/rand.h>
#include <memory>

#include "asio_net_provider.h"
#include "asio_rpc_session.h"
Expand Down Expand Up @@ -63,7 +64,6 @@ error_code asio_network_provider::start(rpc_channel channel, int port, bool clie
// get connection threshold from config, default value 0 means no threshold
_cfg_conn_threshold_per_ip = (uint32_t)dsn_config_get_value_uint64(
"network", "conn_threshold_per_ip", 0, "max connection count to each server per ip");
ddebug("_cfg_conn_threshold_per_ip = %u", _cfg_conn_threshold_per_ip);

for (int i = 0; i < io_service_worker_count; i++) {
_workers.push_back(std::make_shared<std::thread>([this, i]() {
Expand Down Expand Up @@ -126,16 +126,14 @@ error_code asio_network_provider::start(rpc_channel channel, int port, bool clie

rpc_session_ptr asio_network_provider::create_client_session(::dsn::rpc_address server_addr)
{
auto sock = std::shared_ptr<boost::asio::ip::tcp::socket>(
new boost::asio::ip::tcp::socket(_io_service));
auto sock = std::make_shared<boost::asio::ip::tcp::socket>(_io_service);
message_parser_ptr parser(new_message_parser(_client_hdr_format));
return rpc_session_ptr(new asio_rpc_session(*this, server_addr, sock, parser, true));
}

void asio_network_provider::do_accept()
{
auto socket = std::shared_ptr<boost::asio::ip::tcp::socket>(
new boost::asio::ip::tcp::socket(_io_service));
auto socket = std::make_shared<boost::asio::ip::tcp::socket>(_io_service);

_acceptor->async_accept(*socket, [this, socket](boost::system::error_code ec) {
if (!ec) {
Expand All @@ -156,7 +154,7 @@ void asio_network_provider::do_accept()
false);

// when server connection threshold is hit, close the session, otherwise accept it
if (is_conn_threshold_exceeded(s->remote_address())) {
if (check_if_conn_threshold_exceeded(s->remote_address())) {
dwarn("close rpc connection from %s to %s due to hitting server "
"connection threshold per ip",
s->remote_address().to_string(),
Expand Down
10 changes: 4 additions & 6 deletions src/core/tools/common/asio_rpc_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
namespace dsn {
namespace tools {

asio_rpc_session::~asio_rpc_session() {}

void asio_rpc_session::set_options()
{
utils::auto_write_lock socket_guard(_socket_lock);
Expand Down Expand Up @@ -68,9 +66,9 @@ void asio_rpc_session::set_options()
// withheld, waiting for the ACK for the previous packet. For more, please
// refer to <https://en.wikipedia.org/wiki/Nagle's_algorithm>.
//
// Disabling the Nagle algorithm would cause these affacts:
// * decrease delay time (positive affact)
// * decrease the qps (negative affact)
// Disabling the Nagle algorithm would cause these effects:
// * decrease delay time (positive)
// * decrease the qps (negative)
_socket->set_option(boost::asio::ip::tcp::no_delay(true), ec);
if (ec)
dwarn("asio socket set option failed, error = %s", ec.message().c_str());
Expand Down Expand Up @@ -147,7 +145,7 @@ void asio_rpc_session::send(uint64_t signature)
utils::auto_read_lock socket_guard(_socket_lock);
boost::asio::async_write(
*_socket, asio_wbufs, [this, signature](boost::system::error_code ec, std::size_t length) {
if (!!ec) {
if (ec) {
derror(
"asio write to %s failed: %s", _remote_addr.to_string(), ec.message().c_str());
on_failure(true);
Expand Down
3 changes: 2 additions & 1 deletion src/core/tools/common/asio_rpc_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
namespace dsn {
namespace tools {

// A TCP session implementation based on Boost.Asio.
// Thread-safe
class asio_rpc_session : public rpc_session
{
Expand All @@ -45,7 +46,7 @@ class asio_rpc_session : public rpc_session
message_parser_ptr &parser,
bool is_client);

~asio_rpc_session() override;
~asio_rpc_session() override = default;

void send(uint64_t signature) override;

Expand Down