Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
rpc_session: do some refactor
Browse files Browse the repository at this point in the history
1. adjust name and position of some functions in rpc session
2. rename "close_on_fault_injection" to more general "close"
3. delete a useless function
  • Loading branch information
shengofsun committed Jun 29, 2018
1 parent 044ce72 commit 9bbb3e4
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 85 deletions.
126 changes: 63 additions & 63 deletions include/dsn/tool-api/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,103 +206,103 @@ class rpc_session : public ref_counter
@addtogroup tool-api-hooks
@{
*/
DSN_API static join_point<void, rpc_session *> on_rpc_session_connected;
DSN_API static join_point<void, rpc_session *> on_rpc_session_disconnected;
static join_point<void, rpc_session *> on_rpc_session_connected;
static join_point<void, rpc_session *> on_rpc_session_disconnected;
/*@}*/
public:
DSN_API rpc_session(connection_oriented_network &net,
::dsn::rpc_address remote_addr,
message_parser_ptr &parser,
bool is_client);
DSN_API virtual ~rpc_session();
rpc_session(connection_oriented_network &net,
::dsn::rpc_address remote_addr,
message_parser_ptr &parser,
bool is_client);
virtual ~rpc_session();

virtual void close_on_fault_injection() = 0;
virtual void connect() = 0;
virtual void close() = 0;

DSN_API bool has_pending_out_msgs();
bool is_client() const { return _is_client; }
::dsn::rpc_address remote_address() const { return _remote_addr; }
dsn::rpc_address remote_address() const { return _remote_addr; }
connection_oriented_network &net() const { return _net; }
message_parser_ptr parser() const { return _parser; }
DSN_API void send_message(message_ex *msg);
DSN_API bool cancel(message_ex *request);

void send_message(message_ex *msg);
bool cancel(message_ex *request);
void delay_recv(int delay_ms);
DSN_API bool on_recv_message(message_ex *msg, int delay_ms);
bool on_recv_message(message_ex *msg, int delay_ms);

// for client session
public:
// return true if the socket should be closed
DSN_API bool on_disconnected(bool is_write);
protected:
///
/// for send message
///
bool unlink_message_for_send();
virtual void send(uint64_t signature) = 0;
void on_send_completed(uint64_t signature = 0);

virtual void connect() = 0;
enum session_state
{
SS_CONNECTING,
SS_CONNECTED,
SS_DISCONNECTED
};
::dsn::utils::ex_lock_nr _lock; // [
volatile session_state _connect_state;

// for server session
public:
DSN_API void start_read_next(int read_next = 256);
// messages are sent in batch, firstly all messages are linked together
// in a doubly-linked list "_messages".
// if no messages are on-the-flying, a batch of messages are fetch from the "_messages"
// and put them to _sending_msgs; meanwhile, buffers of these messages are put
// in _sending_buffers
dlink _messages;
int _message_count; // count of _messages

bool _is_sending_next;

// should be called in do_read() before using _parser when it is nullptr.
// returns:
// -1 : prepare failed, maybe because of invalid message header type
// 0 : prepare succeed, _parser is not nullptr now.
// >0 : need read more data, returns read_next.
DSN_API int prepare_parser();
std::vector<message_ex *> _sending_msgs;
std::vector<message_parser::send_buf> _sending_buffers;

uint64_t _message_sent;
// ]

// shared
protected:
//
// sending messages are put in _sending_msgs
// buffer is prepared well in _sending_buffers
// always call on_send_completed later
//
virtual void send(uint64_t signature) = 0;
///
/// for receive message
///
void start_read_next(int read_next = 256);
int prepare_parser();
virtual void do_read(int read_next) = 0;

protected:
DSN_API bool try_connecting(); // return true when it is permitted
DSN_API void set_connected();
DSN_API bool set_disconnected(); // return true when it is permitted
///
/// change status and check status
///
// return true when it is permitted
bool set_connecting();
// return true when it is permitted
bool set_disconnected();
void set_connected();

bool is_disconnected() const { return _connect_state == SS_DISCONNECTED; }
bool is_connecting() const { return _connect_state == SS_CONNECTING; }
bool is_connected() const { return _connect_state == SS_CONNECTED; }
DSN_API void on_send_completed(uint64_t signature = 0); // default value for nothing is sent

private:
// return whether there are messages for sending; should always be called in lock
DSN_API bool unlink_message_for_send();
DSN_API void clear_send_queue(bool resend_msgs);
// return whether there are messages for sending;
// should always be called in lock
void clear_send_queue(bool resend_msgs);

bool on_disconnected(bool is_write);

protected:
// constant info
connection_oriented_network &_net;
::dsn::rpc_address _remote_addr;
dsn::rpc_address _remote_addr;
int _max_buffer_block_count_per_send;
message_reader _reader;
message_parser_ptr _parser;

// messages are currently being sent
// also locked by _lock later
std::vector<message_parser::send_buf> _sending_buffers;
std::vector<message_ex *> _sending_msgs;

private:
const bool _is_client;
rpc_client_matcher *_matcher;

enum session_state
{
SS_CONNECTING,
SS_CONNECTED,
SS_DISCONNECTED
};

// TODO: expose the queue to be customizable
::dsn::utils::ex_lock_nr _lock; // [
volatile bool _is_sending_next;
int _message_count; // count of _messages
dlink _messages;
volatile session_state _connect_state;
uint64_t _message_sent;
// ]

std::atomic_int _delay_server_receive_ms;
};

Expand Down
10 changes: 2 additions & 8 deletions src/core/core/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ rpc_session::~rpc_session()
}
}

bool rpc_session::try_connecting()
bool rpc_session::set_connecting()
{
dassert(is_client(), "must be client session");

Expand Down Expand Up @@ -338,12 +338,6 @@ void rpc_session::on_send_completed(uint64_t signature)
this->send(sig);
}

bool rpc_session::has_pending_out_msgs()
{
utils::auto_lock<utils::ex_lock_nr> l(_lock);
return !_messages.is_alone();
}

rpc_session::rpc_session(connection_oriented_network &net,
::dsn::rpc_address remote_addr,
message_parser_ptr &parser,
Expand Down Expand Up @@ -546,7 +540,7 @@ void connection_oriented_network::inject_drop_message(message_ex *msg, bool is_s
}

if (s != nullptr) {
s->close_on_fault_injection();
s->close();
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/core/tools/common/asio_rpc_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ void asio_rpc_session::safe_close()

void asio_rpc_session::connect()
{
if (try_connecting()) {
if (set_connecting()) {
boost::asio::ip::tcp::endpoint ep(boost::asio::ip::address_v4(_remote_addr.ip()),
_remote_addr.port());

Expand Down
2 changes: 1 addition & 1 deletion src/core/tools/common/asio_rpc_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class asio_rpc_session : public rpc_session
bool is_client);
virtual ~asio_rpc_session();
virtual void send(uint64_t signature) override { return write(signature); }
virtual void close_on_fault_injection() override { safe_close(); }
virtual void close() override { safe_close(); }

public:
virtual void connect() override;
Expand Down
2 changes: 1 addition & 1 deletion src/core/tools/common/network.sim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ sim_client_session::sim_client_session(sim_network_provider &net,

void sim_client_session::connect()
{
if (try_connecting())
if (set_connecting())
set_connected();
}

Expand Down
4 changes: 2 additions & 2 deletions src/core/tools/common/network.sim.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class sim_client_session : public rpc_session

virtual void do_read(int sz) override {}

virtual void close_on_fault_injection() override {}
virtual void close() override {}
};

class sim_server_session : public rpc_session
Expand All @@ -71,7 +71,7 @@ class sim_server_session : public rpc_session

virtual void do_read(int sz) override {}

virtual void close_on_fault_injection() override {}
virtual void close() override {}

private:
rpc_session_ptr _client;
Expand Down
2 changes: 1 addition & 1 deletion src/core/tools/hpc/hpc_network_provider.bsd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ void hpc_rpc_session::on_failure(bool is_write)

void hpc_rpc_session::connect()
{
if (!try_connecting())
if (!set_connecting())
return;

dassert(_socket != -1, "invalid given socket handle");
Expand Down
7 changes: 1 addition & 6 deletions src/core/tools/hpc/hpc_network_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,10 @@ class hpc_network_provider : public connection_oriented_network

class hpc_rpc_session : public rpc_session
{
// client
public:
virtual void connect() override;
virtual void close() override;

// server
public:
// shared
public:
hpc_rpc_session(socket_t sock,
message_parser_ptr &parser,
Expand All @@ -114,8 +111,6 @@ class hpc_rpc_session : public rpc_session
#endif
}

virtual void close_on_fault_injection() override { close(); }

void bind_looper(io_looper *looper, bool delay = false);
virtual void do_read(int read_next) override;

Expand Down
2 changes: 1 addition & 1 deletion src/core/tools/hpc/hpc_network_provider.linux.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ void hpc_rpc_session::on_failure(bool is_write)

void hpc_rpc_session::connect()
{
if (!try_connecting())
if (!set_connecting())
return;

dassert(_socket != -1, "invalid given socket handle");
Expand Down
2 changes: 1 addition & 1 deletion src/core/tools/hpc/hpc_network_provider.win.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ void hpc_rpc_session::on_failure(bool is_write)

void hpc_rpc_session::connect()
{
if (!try_connecting())
if (!set_connecting())
return;

_connect_event.callback = [this](int err, uint32_t io_size, uintptr_t lpolp) {
Expand Down

0 comments on commit 9bbb3e4

Please sign in to comment.