diff --git a/include/dsn/tool-api/network.h b/include/dsn/tool-api/network.h index 79aadba096..6021f2c243 100644 --- a/include/dsn/tool-api/network.h +++ b/include/dsn/tool-api/network.h @@ -206,103 +206,112 @@ class rpc_session : public ref_counter @addtogroup tool-api-hooks @{ */ - DSN_API static join_point on_rpc_session_connected; - DSN_API static join_point on_rpc_session_disconnected; + static join_point on_rpc_session_connected; + static join_point on_rpc_session_disconnected; /*@}*/ public: - DSN_API rpc_session(connection_oriented_network &net, - ::dsn::rpc_address remote_addr, - message_parser_ptr &parser, - bool is_client); - DSN_API virtual ~rpc_session(); + rpc_session(connection_oriented_network &net, + ::dsn::rpc_address remote_addr, + message_parser_ptr &parser, + bool is_client); + virtual ~rpc_session(); - virtual void close_on_fault_injection() = 0; + virtual void connect() = 0; + virtual void close() = 0; - DSN_API bool has_pending_out_msgs(); bool is_client() const { return _is_client; } - ::dsn::rpc_address remote_address() const { return _remote_addr; } + dsn::rpc_address remote_address() const { return _remote_addr; } connection_oriented_network &net() const { return _net; } message_parser_ptr parser() const { return _parser; } - DSN_API void send_message(message_ex *msg); - DSN_API bool cancel(message_ex *request); - void delay_recv(int delay_ms); - DSN_API bool on_recv_message(message_ex *msg, int delay_ms); - - // for client session -public: - // return true if the socket should be closed - DSN_API bool on_disconnected(bool is_write); - virtual void connect() = 0; + /// + /// rpc_session's interface for sending and receiving + /// + void send_message(message_ex *msg); + bool cancel(message_ex *request); + void delay_recv(int delay_ms); + bool on_recv_message(message_ex *msg, int delay_ms); - // for server session public: - DSN_API void start_read_next(int read_next = 256); - + /// + /// for subclass to implement receiving message + /// + void start_read_next(int read_next = 256); // should be called in do_read() before using _parser when it is nullptr. // returns: // -1 : prepare failed, maybe because of invalid message header type // 0 : prepare succeed, _parser is not nullptr now. // >0 : need read more data, returns read_next. - DSN_API int prepare_parser(); + int prepare_parser(); + virtual void do_read(int read_next) = 0; - // shared -protected: - // - // sending messages are put in _sending_msgs - // buffer is prepared well in _sending_buffers - // always call on_send_completed later - // + /// + /// for subclass to implement sending message + /// + // return whether there are messages for sending; + // should always be called in lock + bool unlink_message_for_send(); virtual void send(uint64_t signature) = 0; - virtual void do_read(int read_next) = 0; + void on_send_completed(uint64_t signature = 0); protected: - DSN_API bool try_connecting(); // return true when it is permitted - DSN_API void set_connected(); - DSN_API bool set_disconnected(); // return true when it is permitted + /// + /// fields related to sending messages + /// + enum session_state + { + SS_CONNECTING, + SS_CONNECTED, + SS_DISCONNECTED + }; + ::dsn::utils::ex_lock_nr _lock; // [ + volatile session_state _connect_state; + + // messages are sent in batch, firstly all messages are linked together + // in a doubly-linked list "_messages". + // if no messages are on-the-flying, a batch of messages are fetch from the "_messages" + // and put them to _sending_msgs; meanwhile, buffers of these messages are put + // in _sending_buffers + dlink _messages; + int _message_count; // count of _messages + + bool _is_sending_next; + + std::vector _sending_msgs; + std::vector _sending_buffers; + + uint64_t _message_sent; + // ] + +protected: + /// + /// change status and check status + /// + // return true when it is permitted + bool set_connecting(); + // return true when it is permitted + bool set_disconnected(); + void set_connected(); + bool is_disconnected() const { return _connect_state == SS_DISCONNECTED; } bool is_connecting() const { return _connect_state == SS_CONNECTING; } bool is_connected() const { return _connect_state == SS_CONNECTED; } - DSN_API void on_send_completed(uint64_t signature = 0); // default value for nothing is sent -private: - // return whether there are messages for sending; should always be called in lock - DSN_API bool unlink_message_for_send(); - DSN_API void clear_send_queue(bool resend_msgs); + void clear_send_queue(bool resend_msgs); + bool on_disconnected(bool is_write); protected: // constant info connection_oriented_network &_net; - ::dsn::rpc_address _remote_addr; + dsn::rpc_address _remote_addr; int _max_buffer_block_count_per_send; message_reader _reader; message_parser_ptr _parser; - // messages are currently being sent - // also locked by _lock later - std::vector _sending_buffers; - std::vector _sending_msgs; - private: const bool _is_client; rpc_client_matcher *_matcher; - enum session_state - { - SS_CONNECTING, - SS_CONNECTED, - SS_DISCONNECTED - }; - - // TODO: expose the queue to be customizable - ::dsn::utils::ex_lock_nr _lock; // [ - volatile bool _is_sending_next; - int _message_count; // count of _messages - dlink _messages; - volatile session_state _connect_state; - uint64_t _message_sent; - // ] - std::atomic_int _delay_server_receive_ms; }; diff --git a/src/core/core/network.cpp b/src/core/core/network.cpp index e5c77d397e..8205cead6e 100644 --- a/src/core/core/network.cpp +++ b/src/core/core/network.cpp @@ -58,7 +58,7 @@ rpc_session::~rpc_session() } } -bool rpc_session::try_connecting() +bool rpc_session::set_connecting() { dassert(is_client(), "must be client session"); @@ -338,27 +338,23 @@ void rpc_session::on_send_completed(uint64_t signature) this->send(sig); } -bool rpc_session::has_pending_out_msgs() -{ - utils::auto_lock l(_lock); - return !_messages.is_alone(); -} - rpc_session::rpc_session(connection_oriented_network &net, ::dsn::rpc_address remote_addr, message_parser_ptr &parser, bool is_client) - : _net(net), + : _connect_state(is_client ? SS_DISCONNECTED : SS_CONNECTED), + _message_count(0), + _is_sending_next(false), + _message_sent(0), + + _net(net), _remote_addr(remote_addr), _max_buffer_block_count_per_send(net.max_buffer_block_count_per_send()), _reader(net.message_buffer_block_size()), _parser(parser), + _is_client(is_client), _matcher(_net.engine()->matcher()), - _is_sending_next(false), - _message_count(0), - _connect_state(is_client ? SS_DISCONNECTED : SS_CONNECTED), - _message_sent(0), _delay_server_receive_ms(0) { if (!is_client) { @@ -546,7 +542,7 @@ void connection_oriented_network::inject_drop_message(message_ex *msg, bool is_s } if (s != nullptr) { - s->close_on_fault_injection(); + s->close(); } } diff --git a/src/core/tools/common/asio_rpc_session.cpp b/src/core/tools/common/asio_rpc_session.cpp index fcbe04ba9b..cac400d22e 100644 --- a/src/core/tools/common/asio_rpc_session.cpp +++ b/src/core/tools/common/asio_rpc_session.cpp @@ -192,7 +192,7 @@ void asio_rpc_session::safe_close() void asio_rpc_session::connect() { - if (try_connecting()) { + if (set_connecting()) { boost::asio::ip::tcp::endpoint ep(boost::asio::ip::address_v4(_remote_addr.ip()), _remote_addr.port()); diff --git a/src/core/tools/common/asio_rpc_session.h b/src/core/tools/common/asio_rpc_session.h index ab4919a4db..8284678a0a 100644 --- a/src/core/tools/common/asio_rpc_session.h +++ b/src/core/tools/common/asio_rpc_session.h @@ -54,7 +54,7 @@ class asio_rpc_session : public rpc_session bool is_client); virtual ~asio_rpc_session(); virtual void send(uint64_t signature) override { return write(signature); } - virtual void close_on_fault_injection() override { safe_close(); } + virtual void close() override { safe_close(); } public: virtual void connect() override; diff --git a/src/core/tools/common/network.sim.cpp b/src/core/tools/common/network.sim.cpp index 8574c68d2d..acd40b012b 100644 --- a/src/core/tools/common/network.sim.cpp +++ b/src/core/tools/common/network.sim.cpp @@ -57,7 +57,7 @@ sim_client_session::sim_client_session(sim_network_provider &net, void sim_client_session::connect() { - if (try_connecting()) + if (set_connecting()) set_connected(); } diff --git a/src/core/tools/common/network.sim.h b/src/core/tools/common/network.sim.h index 97778550a6..71a713d092 100644 --- a/src/core/tools/common/network.sim.h +++ b/src/core/tools/common/network.sim.h @@ -54,7 +54,7 @@ class sim_client_session : public rpc_session virtual void do_read(int sz) override {} - virtual void close_on_fault_injection() override {} + virtual void close() override {} }; class sim_server_session : public rpc_session @@ -71,7 +71,7 @@ class sim_server_session : public rpc_session virtual void do_read(int sz) override {} - virtual void close_on_fault_injection() override {} + virtual void close() override {} private: rpc_session_ptr _client; diff --git a/src/core/tools/hpc/hpc_network_provider.bsd.cpp b/src/core/tools/hpc/hpc_network_provider.bsd.cpp index 99d4fa4a9a..7439ac3189 100644 --- a/src/core/tools/hpc/hpc_network_provider.bsd.cpp +++ b/src/core/tools/hpc/hpc_network_provider.bsd.cpp @@ -491,7 +491,7 @@ void hpc_rpc_session::on_failure(bool is_write) void hpc_rpc_session::connect() { - if (!try_connecting()) + if (!set_connecting()) return; dassert(_socket != -1, "invalid given socket handle"); diff --git a/src/core/tools/hpc/hpc_network_provider.h b/src/core/tools/hpc/hpc_network_provider.h index 48a86de291..216f324cc3 100644 --- a/src/core/tools/hpc/hpc_network_provider.h +++ b/src/core/tools/hpc/hpc_network_provider.h @@ -91,13 +91,10 @@ class hpc_network_provider : public connection_oriented_network class hpc_rpc_session : public rpc_session { - // client public: virtual void connect() override; + virtual void close() override; - // server -public: - // shared public: hpc_rpc_session(socket_t sock, message_parser_ptr &parser, @@ -114,14 +111,11 @@ class hpc_rpc_session : public rpc_session #endif } - virtual void close_on_fault_injection() override { close(); } - void bind_looper(io_looper *looper, bool delay = false); virtual void do_read(int read_next) override; private: void do_write(uint64_t signature); - void close(); void on_failure(bool is_write = false); void on_read_completed(message_ex *msg) { diff --git a/src/core/tools/hpc/hpc_network_provider.linux.cpp b/src/core/tools/hpc/hpc_network_provider.linux.cpp index 7a76cf7425..59ef836c77 100644 --- a/src/core/tools/hpc/hpc_network_provider.linux.cpp +++ b/src/core/tools/hpc/hpc_network_provider.linux.cpp @@ -484,7 +484,7 @@ void hpc_rpc_session::on_failure(bool is_write) void hpc_rpc_session::connect() { - if (!try_connecting()) + if (!set_connecting()) return; dassert(_socket != -1, "invalid given socket handle"); diff --git a/src/core/tools/hpc/hpc_network_provider.win.cpp b/src/core/tools/hpc/hpc_network_provider.win.cpp index 5bb972ddf3..a23c792ca7 100644 --- a/src/core/tools/hpc/hpc_network_provider.win.cpp +++ b/src/core/tools/hpc/hpc_network_provider.win.cpp @@ -465,7 +465,7 @@ void hpc_rpc_session::on_failure(bool is_write) void hpc_rpc_session::connect() { - if (!try_connecting()) + if (!set_connecting()) return; _connect_event.callback = [this](int err, uint32_t io_size, uintptr_t lpolp) {