From 7f543c44fe24e42e07b533265471ad754caa8099 Mon Sep 17 00:00:00 2001 From: WeijieSun Date: Tue, 12 Dec 2017 12:16:11 +0800 Subject: [PATCH 1/7] rpc_address: remove c interface of dsn_address_t merge the structure of dsn_address_t to rpc_address.h, make code less confused --- include/dsn/c/api_common.h | 2 - include/dsn/c/api_layer1.h | 149 +-------- include/dsn/cpp/address.h | 309 ------------------ include/dsn/cpp/clientlet.h | 5 +- include/dsn/cpp/rpc_holder.h | 2 +- include/dsn/cpp/serialization.h | 2 +- .../cpp/serialization_helper/thrift_helper.h | 8 +- include/dsn/cpp/service_app.h | 2 +- .../dsn/dist/failure_detector_multimaster.h | 3 +- .../replication/replication_service_app.h | 1 + .../dsn/dist/replication/storage_serverlet.h | 2 +- include/dsn/service_api_cpp.h | 2 +- .../dsn/tool-api}/group_address.h | 24 +- include/dsn/tool-api/network.h | 2 +- .../{dist => tool-api}/partition_resolver.h | 4 +- include/dsn/tool-api/rpc_address.h | 243 ++++++++++++++ include/dsn/tool-api/rpc_message.h | 18 +- .../dsn/tool-api}/uri_address.h | 11 +- src/apps/skv/simple_kv.app.example.h | 6 +- src/core/core/address.cpp | 235 ++++++------- src/core/core/clientlet.cpp | 10 +- src/core/core/command_manager.cpp | 2 +- src/core/core/network.cpp | 9 +- src/core/core/partition_resolver_simple.h | 5 +- src/core/core/rpc_engine.cpp | 28 +- src/core/core/rpc_message.cpp | 35 +- src/core/core/service_api_c.cpp | 16 +- src/core/core/service_engine.cpp | 2 +- src/core/core/uri_address.cpp | 12 +- src/core/perf.tests/aio.cpp | 4 +- src/core/perf.tests/rpc.cpp | 2 +- src/core/perf.tests/task_queue.cpp | 6 +- src/core/perf.tests/test_utils.h | 2 +- src/core/tests/address.cpp | 263 +++++++-------- src/core/tests/hpc_io_looper.cpp | 2 +- src/core/tests/hpc_tail_logger.cpp | 2 +- src/core/tests/rpc.cpp | 58 ++-- src/core/tests/service_api_c.cpp | 6 +- src/core/tests/test_utils.h | 6 +- src/core/tools/common/dsn_message_parser.cpp | 6 +- src/core/tools/common/explorer.cpp | 10 +- .../failure_detector_multimaster.cpp | 42 ++- .../replication/client_lib/fs_manager.cpp | 12 +- .../ddl_lib/replication_ddl_client.cpp | 11 +- src/dist/replication/lib/replica.cpp | 2 +- src/dist/replication/lib/replica_2pc.cpp | 2 +- src/dist/replication/lib/replica_config.cpp | 3 +- src/dist/replication/lib/replica_stub.cpp | 4 +- .../replication/meta_server/meta_service.cpp | 2 +- .../replication/meta_server/meta_service.h | 4 +- .../replication/test/simple_kv/client.cpp | 18 +- src/dist/replication/test/simple_kv/client.h | 2 +- src/tests/dsn/failure_detector.cpp | 2 +- 53 files changed, 676 insertions(+), 944 deletions(-) delete mode 100644 include/dsn/cpp/address.h rename {src/core/core => include/dsn/tool-api}/group_address.h (91%) rename include/dsn/{dist => tool-api}/partition_resolver.h (97%) create mode 100644 include/dsn/tool-api/rpc_address.h rename {src/core/core => include/dsn/tool-api}/uri_address.h (94%) diff --git a/include/dsn/c/api_common.h b/include/dsn/c/api_common.h index b0b333ab94..4ca7e85567 100644 --- a/include/dsn/c/api_common.h +++ b/include/dsn/c/api_common.h @@ -62,5 +62,3 @@ typedef void *dsn_handle_t; typedef void *dsn_task_t; typedef void *dsn_task_tracker_t; typedef void *dsn_message_t; -typedef void *dsn_group_t; -typedef void *dsn_uri_t; diff --git a/include/dsn/c/api_layer1.h b/include/dsn/c/api_layer1.h index d4e8760557..e9b22b14e8 100644 --- a/include/dsn/c/api_layer1.h +++ b/include/dsn/c/api_layer1.h @@ -39,6 +39,7 @@ #include #include #include +#include /*! @defgroup service-api-c Core Service API @@ -433,123 +434,6 @@ replace the underneath implementation of the network (e.g., RDMA, simulated netw @{ */ -/*! -@defgroup rpc-addr RPC Address Utilities - -RPC Address Utilities - -@{ -*/ - -/*! rpc address host type */ -typedef enum dsn_host_type_t { - HOST_TYPE_INVALID = 0, - HOST_TYPE_IPV4 = 1, ///< 4 bytes for IPv4 - HOST_TYPE_GROUP = 2, ///< reference to an address group object - HOST_TYPE_URI = 3, ///< universal resource identifier as a string -} dsn_host_type_t; - -/*! rpc address, which is always encoded into a 64-bit integer */ -typedef struct dsn_address_t -{ - union u_t - { - struct - { - unsigned long long type : 2; - unsigned long long padding : 14; - unsigned long long port : 16; - unsigned long long ip : 32; - } v4; ///< \ref HOST_TYPE_IPV4 - struct - { - unsigned long long type : 2; - unsigned long long uri : 62; - } uri; ///< \ref HOST_TYPE_URI - struct - { - unsigned long long type : 2; - unsigned long long group : 62; ///< dsn_group_t - } group; ///< \ref HOST_TYPE_GROUP - uint64_t value; - } u; -} dsn_address_t; - -/*! translate from hostname to ipv4 in host machine order */ -extern DSN_API uint32_t dsn_ipv4_from_host(const char *name); - -/*! get local ipv4 according to the given network interface name */ -extern DSN_API uint32_t dsn_ipv4_local(const char *network_interface); - -/*! build a RPC address from given host name or IPV4 string, and port */ -extern DSN_API dsn_address_t dsn_address_build(const char *host, uint16_t port); - -/*! build a RPC address from a given ipv4 in host machine order and port */ -extern DSN_API dsn_address_t dsn_address_build_ipv4(uint32_t ipv4, uint16_t port); - -/*! build a RPC address from a group address (created using \ref dsn_group_build) */ -extern DSN_API dsn_address_t dsn_address_build_group(dsn_group_t g); - -/*! build a RPC address from a URI address (created using \ref dsn_uri_build) */ -extern DSN_API dsn_address_t dsn_address_build_uri(dsn_uri_t uri); - -/*! dump a RPC address to a meaningful string for logging purpose */ -extern DSN_API const char *dsn_address_to_string(dsn_address_t addr); - -/*! build URI address from a string URL, must be destroyed later using \ref dsn_uri_destroy */ -extern DSN_API dsn_uri_t dsn_uri_build(const char *url); - -/*! build URI address from another, must be destroyed later using \ref dsn_uri_destroy */ -extern DSN_API dsn_uri_t dsn_uri_clone(dsn_uri_t uri); - -/*! destroy a URI address */ -extern DSN_API void dsn_uri_destroy(dsn_uri_t uri); - -/*! build a group address with a name, must be destroyed later using \ref dsn_group_destroy */ -extern DSN_API dsn_group_t dsn_group_build(const char *name); - -/*! clone a group address from another, must be destroyed later using \ref dsn_group_destroy */ -extern DSN_API dsn_group_t dsn_group_clone(dsn_group_t g); - -/*! get the RPC address count contained in the group address */ -extern DSN_API int dsn_group_count(dsn_group_t g); - -/*! add an RPC address into the group address */ -extern DSN_API bool dsn_group_add(dsn_group_t g, dsn_address_t ep); - -/*! remove an RPC address into the group address */ -extern DSN_API bool dsn_group_remove(dsn_group_t g, dsn_address_t ep); - -/*! set an RPC address as the leader in the group address */ -extern DSN_API void dsn_group_set_leader(dsn_group_t g, dsn_address_t ep); - -/*! get leader from the group address */ -extern DSN_API dsn_address_t dsn_group_get_leader(dsn_group_t g); - -/*! check whether the given endpoint is the leader in the group */ -extern DSN_API bool dsn_group_is_leader(dsn_group_t g, dsn_address_t ep); - -/*! whether auto-update of the leader in rDSN runtime is allowed, default is true */ -extern DSN_API bool dsn_group_is_update_leader_automatically(dsn_group_t g); - -/*! set auto-update mode of the leader in rDSN runtime for this group address, true for yes */ -extern DSN_API void dsn_group_set_update_leader_automatically(dsn_group_t g, bool v); - -/*! get the next address in the group right after (circularly) given ep, if ep is invalid, a random - * member is returned */ -extern DSN_API dsn_address_t dsn_group_next(dsn_group_t g, dsn_address_t ep); - -/*! set the next address after (circularly) the current leader as the group leader */ -extern DSN_API dsn_address_t dsn_group_forward_leader(dsn_group_t g); - -/*! destroy the group address object */ -extern DSN_API void dsn_group_destroy(dsn_group_t g); - -/*! get the primary address of the rpc engine attached to the current thread */ -extern DSN_API dsn_address_t dsn_primary_address(); - -/*@}*/ - /*! @defgroup rpc-msg RPC Message Utilities @@ -593,6 +477,8 @@ rpc message read/write @{ */ +extern DSN_API dsn::rpc_address dsn_primary_address(); + /*! create a rpc request message @@ -685,19 +571,6 @@ typedef struct dsn_msg_options_t dsn_msg_context_t context; ///< see \ref dsn_msg_context_t } dsn_msg_options_t; -/*! make sure type sizes match as we simply use uint64_t across language boundaries */ -inline void dsn_address_size_checker() -{ - static_assert(sizeof(dsn_address_t) == sizeof(uint64_t), - "sizeof(dsn_address_t) must equal to sizeof(uint64_t)"); - - static_assert(sizeof(dsn_msg_context_t) == sizeof(uint64_t), - "sizeof(dsn_msg_context_t) must equal to sizeof(uint64_t)"); - - static_assert(sizeof(dsn::gpid) == sizeof(uint64_t), - "sizeof(dsn::gpid) must equal to sizeof(uint64_t)"); -} - /*! set options for the given message @@ -727,10 +600,10 @@ extern DSN_API size_t dsn_msg_body_size(dsn_message_t msg); extern DSN_API void *dsn_msg_rw_ptr(dsn_message_t msg, size_t offset_begin); /*! get from-address where the message is sent */ -extern DSN_API dsn_address_t dsn_msg_from_address(dsn_message_t msg); +extern DSN_API dsn::rpc_address dsn_msg_from_address(dsn_message_t msg); /*! get to-address where the message is sent to */ -extern DSN_API dsn_address_t dsn_msg_to_address(dsn_message_t msg); +extern DSN_API dsn::rpc_address dsn_msg_to_address(dsn_message_t msg); /*! get trace id of the message */ extern DSN_API uint64_t dsn_msg_trace_id(dsn_message_t msg); @@ -800,7 +673,7 @@ extern DSN_API void *dsn_rpc_unregiser_handler(dsn::task_code code, extern DSN_API void dsn_rpc_reply(dsn_message_t response, dsn::error_code err DEFAULT(dsn::ERR_OK)); /*! forward the request to another server instead */ -extern DSN_API void dsn_rpc_forward(dsn_message_t request, dsn_address_t addr); +extern DSN_API void dsn_rpc_forward(dsn_message_t request, dsn::rpc_address addr); /*@}*/ @@ -853,16 +726,16 @@ dsn_rpc_create_response_task_ex(dsn_message_t request, dsn_task_tracker_t tracker DEFAULT(nullptr)); /*! client invokes the RPC call */ -extern DSN_API void dsn_rpc_call(dsn_address_t server, dsn_task_t rpc_call); +extern DSN_API void dsn_rpc_call(dsn::rpc_address server, dsn_task_t rpc_call); /*! client invokes the RPC call and waits for its response, note returned msg must be explicitly released using \ref dsn_msg_release_ref */ -extern DSN_API dsn_message_t dsn_rpc_call_wait(dsn_address_t server, dsn_message_t request); +extern DSN_API dsn_message_t dsn_rpc_call_wait(dsn::rpc_address server, dsn_message_t request); /*! one-way RPC from client, no rpc response is expected */ -extern DSN_API void dsn_rpc_call_one_way(dsn_address_t server, dsn_message_t request); +extern DSN_API void dsn_rpc_call_one_way(dsn::rpc_address server, dsn_message_t request); /*! get response message from the response task, note @@ -1006,7 +879,7 @@ extern DSN_API void dsn_file_write_vector(dsn_handle_t file, \param high_priority true means copy in high priority. \param cb callback aio task to be executed on completion */ -extern DSN_API void dsn_file_copy_remote_directory(dsn_address_t remote, +extern DSN_API void dsn_file_copy_remote_directory(dsn::rpc_address remote, const char *source_dir, const char *dest_dir, bool overwrite, @@ -1025,7 +898,7 @@ extern DSN_API void dsn_file_copy_remote_directory(dsn_address_t remote, \param high_priority true means copy in high priority. \param cb callback aio task to be executed on completion */ -extern DSN_API void dsn_file_copy_remote_files(dsn_address_t remote, +extern DSN_API void dsn_file_copy_remote_files(dsn::rpc_address remote, const char *source_dir, const char **source_files, const char *dest_dir, diff --git a/include/dsn/cpp/address.h b/include/dsn/cpp/address.h deleted file mode 100644 index 29ca2209c7..0000000000 --- a/include/dsn/cpp/address.h +++ /dev/null @@ -1,309 +0,0 @@ -/* - * The MIT License (MIT) - * - * Copyright (c) 2015 Microsoft Corporation - * - * -=- Robust Distributed System Nucleus (rDSN) -=- - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ - -/* - * Description: - * define address helper routines around dsn_address_t structure - * - * Revision history: - * July, 2015, @imzhenyu (Zhenyu Guo), first version - * Aug., 2015, @imzhenyu (Zhenyu Guo), add group and uri address support - * xxxx-xx-xx, author, fix bug about xxx - */ - -#pragma once - -#include -#include -#include -#include -#include -#include // for strcmp() -#include -#include -#include - -#ifdef DSN_USE_THRIFT_SERIALIZATION -#include -#endif - -namespace dsn { -/*! -@addtogroup rpc-addr -@{ -*/ -class rpc_group_address; -class rpc_uri_address; - -class rpc_address -{ -public: - ~rpc_address() { clear(); } - - rpc_address(uint32_t ip, uint16_t port); - rpc_address(const char *host, uint16_t port); - - void assign_ipv4(uint32_t ip, uint16_t port); - void assign_ipv4(const char *host, uint16_t port); - void assign_ipv4_local_address(const char *card_interface, uint16_t port); - void assign_uri(dsn_uri_t uri); - void assign_group(dsn_group_t g); - - rpc_address(); - rpc_address(const rpc_address &addr); - rpc_address(dsn_address_t addr); - rpc_address &operator=(dsn_address_t addr); - - rpc_address clone() const; - const char *to_string() const; - std::string to_std_string() const; - bool from_string_ipv4(const char *s); - dsn_host_type_t type() const { return (dsn_host_type_t)_addr.u.v4.type; } - dsn_address_t c_addr() const { return _addr; } - dsn_address_t *c_addr_ptr() { return &_addr; } - uint32_t ip() const { return (uint32_t)_addr.u.v4.ip; } - uint16_t port() const { return (uint16_t)_addr.u.v4.port; } - rpc_group_address *group_address() const - { - return (rpc_group_address *)(uintptr_t)_addr.u.group.group; - } - dsn_group_t group_handle() const { return (dsn_group_t)(uintptr_t)_addr.u.group.group; } - rpc_uri_address *uri_address() const { return (rpc_uri_address *)(uintptr_t)_addr.u.uri.uri; } - dsn_uri_t uri_handle() const { return (dsn_group_t)(uintptr_t)_addr.u.uri.uri; } - bool is_invalid() const { return _addr.u.v4.type == HOST_TYPE_INVALID; } - void set_invalid() { clear(); } - - bool operator==(::dsn::rpc_address r) const; - bool operator!=(::dsn::rpc_address r) const; - bool operator<(::dsn::rpc_address r) const; - -#ifdef DSN_USE_THRIFT_SERIALIZATION - uint32_t read(::apache::thrift::protocol::TProtocol *iprot); - uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const; -#endif -private: - void clear(); - -private: - dsn_address_t _addr; -}; - -class url_host_address : public rpc_address -{ -public: - // dsn://mycluster/myapp, or host-name:port - url_host_address(const char *url_or_host_port); - url_host_address() {} - -private: - std::string _url_host; //< make sure the buffer is valid -}; - -// ------------- inline implementation ------------------- -inline rpc_address::rpc_address(uint32_t ip, uint16_t port) -{ - assign_ipv4(ip, port); - - static_assert(sizeof(rpc_address) == sizeof(dsn_address_t), - "make sure rpc_address does not " - "add new payload to dsn_address_t " - "to keep it sizeof(uint64_t)"); -} - -inline rpc_address::rpc_address(const char *host, uint16_t port) { assign_ipv4(host, port); } - -inline void rpc_address::assign_ipv4(uint32_t ip, uint16_t port) -{ - clear(); - _addr.u.v4.type = HOST_TYPE_IPV4; - _addr.u.v4.ip = ip; - _addr.u.v4.port = port; -} - -inline void rpc_address::assign_ipv4(const char *host, uint16_t port) -{ - clear(); - _addr.u.v4.type = HOST_TYPE_IPV4; - _addr.u.v4.ip = dsn_ipv4_from_host(host); - _addr.u.v4.port = port; -} - -inline void rpc_address::assign_ipv4_local_address(const char *network_interface, uint16_t port) -{ - clear(); - _addr.u.v4.type = HOST_TYPE_IPV4; - _addr.u.v4.ip = dsn_ipv4_local(network_interface); - _addr.u.v4.port = port; -} - -inline void rpc_address::assign_uri(dsn_uri_t uri) -{ - clear(); - _addr.u.v4.type = HOST_TYPE_URI; - _addr.u.uri.uri = (uint64_t)uri; -} - -inline void rpc_address::assign_group(dsn_group_t g) -{ - clear(); - _addr.u.v4.type = HOST_TYPE_GROUP; - _addr.u.group.group = (uint64_t)g; -} - -inline rpc_address::rpc_address() -{ - _addr.u.value = 0; - _addr.u.v4.type = HOST_TYPE_INVALID; -} - -inline rpc_address::rpc_address(const rpc_address &addr) { _addr = addr._addr; } - -inline rpc_address::rpc_address(dsn_address_t addr) { _addr = addr; } - -inline rpc_address &rpc_address::operator=(dsn_address_t addr) -{ - _addr = addr; - return *this; -} - -inline bool rpc_address::operator==(::dsn::rpc_address r) const -{ - if (_addr.u.v4.type != r.type()) - return false; - - switch (_addr.u.v4.type) { - case HOST_TYPE_IPV4: - return _addr.u.v4.ip == r.ip() && _addr.u.v4.port == r.port(); - case HOST_TYPE_URI: - return strcmp(to_string(), r.to_string()) == 0; - case HOST_TYPE_GROUP: - return _addr.u.group.group == r.c_addr().u.group.group; - default: - return true; - } -} - -inline bool rpc_address::operator!=(::dsn::rpc_address r) const { return !(*this == r); } - -inline bool rpc_address::operator<(::dsn::rpc_address r) const -{ - if (_addr.u.v4.type != r.type()) - return _addr.u.v4.type < r.type(); - - switch (_addr.u.v4.type) { - case HOST_TYPE_IPV4: - return _addr.u.v4.ip < r.ip() || (_addr.u.v4.ip == r.ip() && _addr.u.v4.port < r.port()); - case HOST_TYPE_URI: - return strcmp(to_string(), r.to_string()) < 0; - case HOST_TYPE_GROUP: - return _addr.u.group.group < r.c_addr().u.group.group; - default: - return true; - } -} - -inline void rpc_address::clear() { _addr.u.value = 0; } - -inline rpc_address rpc_address::clone() const -{ - rpc_address addr; - switch (_addr.u.v4.type) { - case HOST_TYPE_IPV4: - addr = this->c_addr(); - break; - case HOST_TYPE_URI: - addr.assign_uri(dsn_uri_clone(this->uri_handle())); - break; - case HOST_TYPE_GROUP: - addr.assign_group(dsn_group_clone(this->group_handle())); - break; - default: - break; - } - return addr; -} - -inline const char *rpc_address::to_string() const { return dsn_address_to_string(_addr); } - -inline std::string rpc_address::to_std_string() const { return std::string(to_string()); } - -inline bool rpc_address::from_string_ipv4(const char *s) -{ - std::string str = std::string(s); - auto pos = str.find_last_of(':'); - if (pos == std::string::npos) - return false; - else { - auto host = str.substr(0, pos); - auto port_str = str.substr(pos + 1); - char *p = nullptr; - long port = ::strtol(port_str.data(), &p, 10); - if (*p != 0) // bad string - return false; - if (port <= 0 || port > UINT16_MAX) // out of range - return false; - assign_ipv4(host.c_str(), (uint16_t)port); - return true; - } -} - -inline url_host_address::url_host_address(const char *url_or_host_port) -{ - std::string s(url_or_host_port); - auto sp = s.find(':'); - if (sp != std::string::npos) { - uint16_t port = (uint16_t)atoi(s.substr(sp + 1).c_str()); - if (port == 0) { - _url_host = std::string(url_or_host_port); - assign_uri(dsn_uri_build(_url_host.c_str())); - } else { - s = s.substr(0, sp); - assign_ipv4(s.c_str(), port); - } - } -} -/*@}*/ -} - -namespace std { -template <> -struct hash<::dsn::rpc_address> -{ - size_t operator()(const ::dsn::rpc_address &ep) const - { - switch (ep.type()) { - case HOST_TYPE_IPV4: - return std::hash()(ep.ip()) ^ std::hash()(ep.port()); - case HOST_TYPE_URI: - return std::hash()(std::string(ep.to_string())); - case HOST_TYPE_GROUP: - return std::hash()(ep.group_address()); - default: - return 0; - } - } -}; -} diff --git a/include/dsn/cpp/clientlet.h b/include/dsn/cpp/clientlet.h index 27ba2665f2..939e26bbc6 100644 --- a/include/dsn/cpp/clientlet.h +++ b/include/dsn/cpp/clientlet.h @@ -35,6 +35,7 @@ #pragma once +#include #include #include @@ -274,7 +275,7 @@ task_ptr call(::dsn::rpc_address server, { task_ptr t = create_rpc_response_task( request, svc, std::forward(callback), reply_thread_hash); - dsn_rpc_call(server.c_addr(), t->native_handle()); + dsn_rpc_call(server, t->native_handle()); return t; } @@ -350,7 +351,7 @@ void call_one_way_typed(::dsn::rpc_address server, { dsn_message_t msg = dsn_msg_create_request(code, 0, thread_hash, partition_hash); ::dsn::marshall(msg, req); - dsn_rpc_call_one_way(server.c_addr(), msg); + dsn_rpc_call_one_way(server, msg); } template diff --git a/include/dsn/cpp/rpc_holder.h b/include/dsn/cpp/rpc_holder.h index f9417772ff..b6d03148a5 100644 --- a/include/dsn/cpp/rpc_holder.h +++ b/include/dsn/cpp/rpc_holder.h @@ -155,7 +155,7 @@ class rpc_holder cb_fwd(err); }, reply_thread_hash); - dsn_rpc_call(server.c_addr(), t->native_handle()); + dsn_rpc_call(server, t->native_handle()); return t; } diff --git a/include/dsn/cpp/serialization.h b/include/dsn/cpp/serialization.h index f254efde38..bd64da87f2 100644 --- a/include/dsn/cpp/serialization.h +++ b/include/dsn/cpp/serialization.h @@ -36,7 +36,7 @@ #pragma once #include -#include +#include #include #ifdef DSN_USE_THRIFT_SERIALIZATION diff --git a/include/dsn/cpp/serialization_helper/thrift_helper.h b/include/dsn/cpp/serialization_helper/thrift_helper.h index ec812de166..ac3c7a1eb8 100644 --- a/include/dsn/cpp/serialization_helper/thrift_helper.h +++ b/include/dsn/cpp/serialization_helper/thrift_helper.h @@ -235,8 +235,8 @@ inline uint32_t rpc_address::read(apache::thrift::protocol::TProtocol *iprot) dynamic_cast(iprot); if (binary_proto != nullptr) { // the protocol is binary protocol - auto r = iprot->readI64(reinterpret_cast(_addr.u.value)); - dassert(_addr.u.v4.type == HOST_TYPE_INVALID || _addr.u.v4.type == HOST_TYPE_IPV4, + auto r = iprot->readI64(reinterpret_cast(_addr.value)); + dassert(_addr.v4.type == HOST_TYPE_INVALID || _addr.v4.type == HOST_TYPE_IPV4, "only invalid or ipv4 can be deserialized from binary"); return r; } else { @@ -295,9 +295,9 @@ inline uint32_t rpc_address::write(apache::thrift::protocol::TProtocol *oprot) c dynamic_cast(oprot); if (binary_proto != nullptr) { // the protocol is binary protocol - dassert(_addr.u.v4.type == HOST_TYPE_INVALID || _addr.u.v4.type == HOST_TYPE_IPV4, + dassert(_addr.v4.type == HOST_TYPE_INVALID || _addr.v4.type == HOST_TYPE_IPV4, "only invalid or ipv4 can be serialized to binary"); - return oprot->writeI64((int64_t)_addr.u.value); + return oprot->writeI64((int64_t)_addr.value); } else { // the protocol is json protocol std::string host(this->to_string()); diff --git a/include/dsn/cpp/service_app.h b/include/dsn/cpp/service_app.h index ef3e87bcb7..302d8f5af9 100644 --- a/include/dsn/cpp/service_app.h +++ b/include/dsn/cpp/service_app.h @@ -37,7 +37,7 @@ #include #include -#include +#include #include #include #include diff --git a/include/dsn/dist/failure_detector_multimaster.h b/include/dsn/dist/failure_detector_multimaster.h index b62439eb08..5fa8f16009 100644 --- a/include/dsn/dist/failure_detector_multimaster.h +++ b/include/dsn/dist/failure_detector_multimaster.h @@ -35,6 +35,7 @@ #pragma once +#include #include #include #include @@ -81,7 +82,7 @@ class slave_failure_detector_with_multimaster : public dsn::fd::failure_detector inline ::dsn::rpc_address slave_failure_detector_with_multimaster::current_server_contact() const { service::zauto_lock l(failure_detector::_lock); - return dsn_group_get_leader(_meta_servers.group_handle()); + return _meta_servers.group_address()->leader(); } } } // end namespace diff --git a/include/dsn/dist/replication/replication_service_app.h b/include/dsn/dist/replication/replication_service_app.h index fdec8755b0..057970c59c 100644 --- a/include/dsn/dist/replication/replication_service_app.h +++ b/include/dsn/dist/replication/replication_service_app.h @@ -35,6 +35,7 @@ #pragma once +#include #include dsn::error_code dsn_layer2_stateful_type1_bridge(int argc, char **argv); diff --git a/include/dsn/dist/replication/storage_serverlet.h b/include/dsn/dist/replication/storage_serverlet.h index 400a427848..18af9fcab2 100644 --- a/include/dsn/dist/replication/storage_serverlet.h +++ b/include/dsn/dist/replication/storage_serverlet.h @@ -82,7 +82,7 @@ class storage_serverlet : public dsn::clientlet dassert(false, "recv message with unhandled rpc name %s from %s, trace_id = %016" PRIx64, t.to_string(), - dsn_address_to_string(dsn_msg_from_address(request)), + dsn_msg_from_address(request).to_string(), dsn_msg_trace_id(request)); dsn_rpc_reply(dsn_msg_create_response(request), ::dsn::ERR_HANDLER_NOT_FOUND); } diff --git a/include/dsn/service_api_cpp.h b/include/dsn/service_api_cpp.h index 901c608b74..6753934349 100644 --- a/include/dsn/service_api_cpp.h +++ b/include/dsn/service_api_cpp.h @@ -46,5 +46,5 @@ #include #include #include -#include +#include #include diff --git a/src/core/core/group_address.h b/include/dsn/tool-api/group_address.h similarity index 91% rename from src/core/core/group_address.h rename to include/dsn/tool-api/group_address.h index 293fd12037..715d2809a9 100644 --- a/src/core/core/group_address.h +++ b/include/dsn/tool-api/group_address.h @@ -35,12 +35,15 @@ #pragma once -#include +#include +#include +#include #include -#include // for std::find() +#include +#include namespace dsn { -class rpc_group_address +class rpc_group_address : public dsn::ref_counter { public: rpc_group_address(const char *name); @@ -52,26 +55,24 @@ class rpc_group_address bool contains(rpc_address addr); int count(); - dsn_group_t handle() const { return (dsn_group_t)this; } const std::vector &members() const { return _members; } rpc_address random_member() const { alr_t l(_lock); - return _members.empty() ? _invalid + return _members.empty() ? rpc_address::sInvalid : _members[dsn_random32(0, (uint32_t)_members.size() - 1)]; } rpc_address next(rpc_address current) const; rpc_address leader() const { alr_t l(_lock); - return _leader_index >= 0 ? _members[_leader_index] : _invalid; + return _leader_index >= 0 ? _members[_leader_index] : rpc_address::sInvalid; } void leader_forward(); rpc_address possible_leader(); bool is_update_leader_automatically() const { return _update_leader_automatically; } void set_update_leader_automatically(bool value) { _update_leader_automatically = value; } const char *name() const { return _name.c_str(); } - rpc_address address() const { return _group_address; } private: typedef std::vector members_t; @@ -83,8 +84,6 @@ class rpc_group_address int _leader_index; bool _update_leader_automatically; std::string _name; - rpc_address _group_address; - static const rpc_address _invalid; }; // ------------------ inline implementation -------------------- @@ -94,7 +93,6 @@ inline rpc_group_address::rpc_group_address(const char *name) _name = name; _leader_index = -1; _update_leader_automatically = true; - _group_address.assign_group(handle()); } inline rpc_group_address::rpc_group_address(const rpc_group_address &other) @@ -103,7 +101,6 @@ inline rpc_group_address::rpc_group_address(const rpc_group_address &other) _leader_index = other._leader_index; _update_leader_automatically = other._update_leader_automatically; _members = other._members; - _group_address.assign_group(handle()); } inline rpc_group_address &rpc_group_address::operator=(const rpc_group_address &other) @@ -112,7 +109,6 @@ inline rpc_group_address &rpc_group_address::operator=(const rpc_group_address & _leader_index = other._leader_index; _update_leader_automatically = other._update_leader_automatically; _members = other._members; - _group_address.assign_group(handle()); return *this; } @@ -160,7 +156,7 @@ inline rpc_address rpc_group_address::possible_leader() { alr_t l(_lock); if (_members.empty()) - return _invalid; + return rpc_address::sInvalid; if (_leader_index == -1) _leader_index = dsn_random32(0, (uint32_t)_members.size() - 1); return _members[_leader_index]; @@ -196,7 +192,7 @@ inline rpc_address rpc_group_address::next(rpc_address current) const { alr_t l(_lock); if (_members.empty()) - return _invalid; + return rpc_address::sInvalid; if (current.is_invalid()) return _members[dsn_random32(0, (uint32_t)_members.size() - 1)]; else { diff --git a/include/dsn/tool-api/network.h b/include/dsn/tool-api/network.h index fb5edf0329..79aadba096 100644 --- a/include/dsn/tool-api/network.h +++ b/include/dsn/tool-api/network.h @@ -38,7 +38,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/include/dsn/dist/partition_resolver.h b/include/dsn/tool-api/partition_resolver.h similarity index 97% rename from include/dsn/dist/partition_resolver.h rename to include/dsn/tool-api/partition_resolver.h index 31ceec0476..245ac1bd2a 100644 --- a/include/dsn/dist/partition_resolver.h +++ b/include/dsn/tool-api/partition_resolver.h @@ -35,8 +35,10 @@ #pragma once -#include #include +#include +#include +#include namespace dsn { namespace dist { diff --git a/include/dsn/tool-api/rpc_address.h b/include/dsn/tool-api/rpc_address.h new file mode 100644 index 0000000000..07836d3130 --- /dev/null +++ b/include/dsn/tool-api/rpc_address.h @@ -0,0 +1,243 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) 2015 Microsoft Corporation + * + * -=- Robust Distributed System Nucleus (rDSN) -=- + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +/* + * Description: + * define address helper routines + * + * Revision history: + * July, 2015, @imzhenyu (Zhenyu Guo), first version + * Aug., 2015, @imzhenyu (Zhenyu Guo), add group and uri address support + * xxxx-xx-xx, author, fix bug about xxx + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#ifdef DSN_USE_THRIFT_SERIALIZATION +#include +#endif + +typedef enum dsn_host_type_t { + HOST_TYPE_INVALID = 0, + HOST_TYPE_IPV4 = 1, + HOST_TYPE_GROUP = 2, + HOST_TYPE_URI = 3, +} dsn_host_type_t; + +namespace dsn { + +class rpc_group_address; +class rpc_uri_address; + +class rpc_address +{ +public: + static const rpc_address sInvalid; + static uint32_t ipv4_from_host(const char *hostname); + static uint32_t ipv4_from_network_interface(const char *network_interface); + ~rpc_address(); + + rpc_address() = default; + + rpc_address(const rpc_address &another); + + rpc_address &operator=(const rpc_address &another); + + rpc_address(uint32_t ip, uint16_t port) + { + assign_ipv4(ip, port); + static_assert(sizeof(rpc_address) == sizeof(uint64_t), + "make sure rpc_address does not " + "add new payload to dsn::rpc_address " + "to keep it sizeof(uint64_t)"); + } + + rpc_address(const char *host, uint16_t port) { assign_ipv4(host, port); } + + void assign_ipv4(uint32_t ip, uint16_t port) + { + clear(); + _addr.v4.type = HOST_TYPE_IPV4; + _addr.v4.ip = ip; + _addr.v4.port = port; + } + + void assign_ipv4(const char *host, uint16_t port) + { + clear(); + _addr.v4.type = HOST_TYPE_IPV4; + _addr.v4.ip = rpc_address::ipv4_from_host(host); + _addr.v4.port = port; + } + + void assign_ipv4_local_address(const char *network_interface, uint16_t port) + { + clear(); + _addr.v4.type = HOST_TYPE_IPV4; + _addr.v4.ip = rpc_address::ipv4_from_network_interface(network_interface); + _addr.v4.port = port; + } + + void assign_uri(const char *host_uri); + void assign_group(const char *name); + + rpc_address clone() const; + const char *to_string() const; + + std::string to_std_string() const { return std::string(to_string()); } + bool from_string_ipv4(const char *s) + { + clear(); + std::string str = std::string(s); + auto pos = str.find_last_of(':'); + if (pos == std::string::npos) + return false; + else { + auto host = str.substr(0, pos); + auto port_str = str.substr(pos + 1); + char *p = nullptr; + long port = ::strtol(port_str.data(), &p, 10); + if (*p != 0) // bad string + return false; + if (port <= 0 || port > UINT16_MAX) // out of range + return false; + assign_ipv4(host.c_str(), (uint16_t)port); + return true; + } + } + + uint64_t &value() { return _addr.value; } + + dsn_host_type_t type() const { return (dsn_host_type_t)_addr.v4.type; } + uint32_t ip() const { return (uint32_t)_addr.v4.ip; } + uint16_t port() const { return (uint16_t)_addr.v4.port; } + void set_port(uint16_t port) { _addr.v4.port = port; } + rpc_group_address *group_address() const + { + return (rpc_group_address *)(uintptr_t)_addr.group.group; + } + rpc_uri_address *uri_address() const { return (rpc_uri_address *)(uintptr_t)_addr.uri.uri; } + bool is_invalid() const { return _addr.v4.type == HOST_TYPE_INVALID; } + void set_invalid() { clear(); } + + bool operator==(::dsn::rpc_address r) const + { + if (type() != r.type()) + return false; + + switch (type()) { + case HOST_TYPE_IPV4: + return ip() == r.ip() && _addr.v4.port == r.port(); + case HOST_TYPE_URI: + return strcmp(to_string(), r.to_string()) == 0; + case HOST_TYPE_GROUP: + return _addr.group.group == r._addr.group.group; + default: + return true; + } + } + + bool operator!=(::dsn::rpc_address r) const { return !(*this == r); } + + bool operator<(::dsn::rpc_address r) const + { + if (type() != r.type()) + return type() < r.type(); + + switch (type()) { + case HOST_TYPE_IPV4: + return ip() < r.ip() || (ip() == r.ip() && port() < r.port()); + case HOST_TYPE_URI: + return strcmp(to_string(), r.to_string()) < 0; + case HOST_TYPE_GROUP: + return _addr.group.group < r._addr.group.group; + default: + return true; + } + } + +#ifdef DSN_USE_THRIFT_SERIALIZATION + uint32_t read(::apache::thrift::protocol::TProtocol *iprot); + uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const; +#endif + +private: + // before you assign new value, must call clear() to release original value + // and you MUST ensure that _addr is INITIALIZED before you call this function + void clear(); + +private: + union + { + struct + { + unsigned long long type : 2; + unsigned long long padding : 14; + unsigned long long port : 16; + unsigned long long ip : 32; + } v4; ///< \ref HOST_TYPE_IPV4 + struct + { + unsigned long long type : 2; + unsigned long long uri : 62; + } uri; ///< \ref HOST_TYPE_URI + struct + { + unsigned long long type : 2; + unsigned long long group : 62; ///< dsn_group_t + } group; ///< \ref HOST_TYPE_GROUP + uint64_t value; + } _addr{.value = 0}; +}; +} + +namespace std { +template <> +struct hash<::dsn::rpc_address> +{ + size_t operator()(const ::dsn::rpc_address &ep) const + { + switch (ep.type()) { + case HOST_TYPE_IPV4: + return std::hash()(ep.ip()) ^ std::hash()(ep.port()); + case HOST_TYPE_URI: + return std::hash()(std::string(ep.to_string())); + case HOST_TYPE_GROUP: + return std::hash()(ep.group_address()); + default: + return 0; + } + } +}; +} diff --git a/include/dsn/tool-api/rpc_message.h b/include/dsn/tool-api/rpc_message.h index e1f656eaaf..f1ce3959fd 100644 --- a/include/dsn/tool-api/rpc_message.h +++ b/include/dsn/tool-api/rpc_message.h @@ -40,10 +40,10 @@ #include #include #include +#include #include #include -#include -#include +#include #include namespace dsn { @@ -79,11 +79,15 @@ typedef struct message_header dsn::gpid gpid; // global partition id dsn_msg_context_t context; - // always ipv4/v6 address, + // Attention: + // here, from_address must be IPv4 address, namely we can regard from_address as a + // POD-type structure, so no memory-leak will occur even if we don't call it's + // destructor. + // // generally, it is the from_node's primary address, except the // case described in message_ex::create_response()'s ATTENTION comment. - // the from_address is always the orignal client's address, it will - // not be changed in forwarding request. + // + // in the forwarding case, the from_address is always the orignal client's address rpc_address from_address; struct @@ -98,6 +102,9 @@ typedef struct message_header char error_name[DSN_MAX_ERROR_CODE_NAME_LENGTH]; fast_code error_code; // dsn::error_code } server; + + message_header() = default; + ~message_header() = default; } message_header; class message_ex : public ref_counter, @@ -161,6 +168,7 @@ class message_ex : public ref_counter, private: DSN_API message_ex(); DSN_API void prepare_buffer_header(); + DSN_API void release_buffer_header(); private: static std::atomic _id; diff --git a/src/core/core/uri_address.h b/include/dsn/tool-api/uri_address.h similarity index 94% rename from src/core/core/uri_address.h rename to include/dsn/tool-api/uri_address.h index 4a10eb4409..3ccf69bd8d 100644 --- a/src/core/core/uri_address.h +++ b/include/dsn/tool-api/uri_address.h @@ -35,14 +35,16 @@ #pragma once -#include -#include -#include // for std::find() +#include #include +#include +#include +#include +#include namespace dsn { /** A RPC URI address. */ -class rpc_uri_address +class rpc_uri_address : public dsn::ref_counter { public: /** @@ -79,7 +81,6 @@ class rpc_uri_address private: ::dsn::dist::partition_resolver_ptr _resolver; std::string _uri; - rpc_address _uri_address; }; class uri_resolver diff --git a/src/apps/skv/simple_kv.app.example.h b/src/apps/skv/simple_kv.app.example.h index 5e47ffeb32..e1a169ae41 100644 --- a/src/apps/skv/simple_kv.app.example.h +++ b/src/apps/skv/simple_kv.app.example.h @@ -57,7 +57,7 @@ class simple_kv_client_app : public ::dsn::service_app, public virtual ::dsn::cl return ::dsn::ERR_INVALID_PARAMETERS; // argv[1]: e.g., dsn://mycluster/simple-kv.instance0 - _server = url_host_address(args[1].c_str()); + _server.assign_uri(args[1].c_str()); _simple_kv_client.reset(new simple_kv_client2(_server)); _timer = ::dsn::tasking::enqueue_timer( @@ -120,7 +120,7 @@ class simple_kv_client_app : public ::dsn::service_app, public virtual ::dsn::cl private: ::dsn::task_ptr _timer; - ::dsn::url_host_address _server; + ::dsn::rpc_address _server; std::unique_ptr _simple_kv_client; }; @@ -138,7 +138,7 @@ class simple_kv_perf_test_client_app : public ::dsn::service_app, public virtual // argv[1]: e.g., dsn://mycluster/simple-kv.instance0 rpc_address service_addr; - service_addr.assign_uri(dsn_uri_build(args[1].c_str())); + service_addr.assign_uri(args[1].c_str()); _simple_kv_client.reset(new simple_kv_perf_test_client(service_addr)); _simple_kv_client->start_test("simple_kv.perf-test.case", 3); diff --git a/src/core/core/address.cpp b/src/core/core/address.cpp index 108dd9cafe..0625858dee 100644 --- a/src/core/core/address.cpp +++ b/src/core/core/address.cpp @@ -57,15 +57,16 @@ #include #include -#include -#include + +#include + +#include +#include +#include #include -#include "group_address.h" -#include "uri_address.h" namespace dsn { -const rpc_address rpc_group_address::_invalid; -} +const rpc_address rpc_address::sInvalid; #ifdef _WIN32 static void net_init() @@ -82,8 +83,8 @@ static void net_init() } #endif -// name to ip etc. -DSN_API uint32_t dsn_ipv4_from_host(const char *name) +/*static*/ +uint32_t rpc_address::ipv4_from_host(const char *name) { sockaddr_in addr; memset(&addr, 0, sizeof(addr)); @@ -111,9 +112,10 @@ DSN_API uint32_t dsn_ipv4_from_host(const char *name) return (uint32_t)ntohl(addr.sin_addr.s_addr); } +/*static*/ // if network_interface is "", then return the first // site-local ipv4 address: 10.*.*.*, 172.16.*.*, 192.168.*.* -DSN_API uint32_t dsn_ipv4_local(const char *network_interface) +DSN_API uint32_t rpc_address::ipv4_from_network_interface(const char *network_interface) { uint32_t ret = 0; @@ -153,8 +155,91 @@ DSN_API uint32_t dsn_ipv4_local(const char *network_interface) return ret; } +rpc_address::~rpc_address() { clear(); } + +rpc_address::rpc_address(const rpc_address &another) { *this = another; } + +rpc_address &rpc_address::operator=(const rpc_address &another) +{ + if (this == &another) { + // avoid memory leak + return *this; + } + clear(); + _addr = another._addr; + switch (another.type()) { + case HOST_TYPE_GROUP: + group_address()->add_ref(); + break; + case HOST_TYPE_URI: + uri_address()->add_ref(); + break; + default: + break; + } + return *this; +} + +void rpc_address::assign_uri(const char *host_uri) +{ + clear(); + _addr.uri.type = HOST_TYPE_URI; + dsn::rpc_uri_address *addr = new dsn::rpc_uri_address(host_uri); + // take the lifetime of rpc_uri_address, release_ref when change value or call deconstruct + addr->add_ref(); + _addr.uri.uri = (uint64_t)addr; +} + +void rpc_address::assign_group(const char *name) +{ + clear(); + _addr.group.type = HOST_TYPE_GROUP; + dsn::rpc_group_address *addr = new dsn::rpc_group_address(name); + // take the lifetime of rpc_uri_address, release_ref when change value or call deconstruct + addr->add_ref(); + _addr.group.group = (uint64_t)addr; +} + +rpc_address rpc_address::clone() const +{ + rpc_address new_address; + + if (type() == HOST_TYPE_IPV4) { + new_address._addr = _addr; + } else if (type() == HOST_TYPE_URI) { + dsn::rpc_uri_address *addr = new dsn::rpc_uri_address(*uri_address()); + addr->add_ref(); + new_address._addr.uri.uri = (uint64_t)addr; + new_address._addr.uri.type = HOST_TYPE_URI; + } else if (type() == HOST_TYPE_GROUP) { + dsn::rpc_group_address *addr = new dsn::rpc_group_address(*group_address()); + addr->add_ref(); + new_address._addr.group.group = (uint64_t)addr; + new_address._addr.group.type = HOST_TYPE_GROUP; + } else { + new_address.clear(); + } + + return new_address; +} + +void rpc_address::clear() +{ + switch (type()) { + case HOST_TYPE_GROUP: + group_address()->release_ref(); + break; + case HOST_TYPE_URI: + uri_address()->release_ref(); + break; + default: + break; + } + _addr.value = 0; +} + static __thread fixed_size_buffer_pool<8, 256> bf; -DSN_API const char *dsn_address_to_string(dsn_address_t addr) +const char *rpc_address::to_string() const { char *p = bf.next(); auto sz = bf.get_chunk_size(); @@ -165,23 +250,23 @@ DSN_API const char *dsn_address_to_string(dsn_address_t addr) int ip_len; #endif - switch (addr.u.v4.type) { + switch (_addr.v4.type) { case HOST_TYPE_IPV4: - net_addr.s_addr = htonl((uint32_t)addr.u.v4.ip); + net_addr.s_addr = htonl(ip()); #ifdef _WIN32 ip_str = inet_ntoa(net_addr); snprintf_p(p, sz, "%s:%hu", ip_str, (uint16_t)addr.u.v4.port); #else inet_ntop(AF_INET, &net_addr, p, sz); ip_len = strlen(p); - snprintf_p(p + ip_len, sz - ip_len, ":%hu", (uint16_t)addr.u.v4.port); + snprintf_p(p + ip_len, sz - ip_len, ":%hu", port()); #endif break; case HOST_TYPE_URI: - p = (char *)(uintptr_t)addr.u.uri.uri; + p = (char *)uri_address()->uri(); break; case HOST_TYPE_GROUP: - p = (char *)(((dsn::rpc_group_address *)(uintptr_t)(addr.u.group.group))->name()); + p = (char *)group_address()->name(); break; default: p = (char *)"invalid address"; @@ -190,124 +275,4 @@ DSN_API const char *dsn_address_to_string(dsn_address_t addr) return (const char *)p; } - -DSN_API dsn_address_t dsn_address_build(const char *host, uint16_t port) -{ - dsn::rpc_address addr(host, port); - return addr.c_addr(); -} - -DSN_API dsn_address_t dsn_address_build_ipv4(uint32_t ipv4, uint16_t port) -{ - dsn::rpc_address addr(ipv4, port); - return addr.c_addr(); -} - -DSN_API dsn_address_t dsn_address_build_group(dsn_group_t g) -{ - dsn::rpc_address addr; - addr.assign_group(g); - return addr.c_addr(); -} - -DSN_API dsn_address_t dsn_address_build_uri(dsn_uri_t uri) -{ - dsn::rpc_address addr; - addr.assign_uri(uri); - return addr.c_addr(); -} - -DSN_API dsn_group_t dsn_group_build(const char *name) // must be paired with release later -{ - return new ::dsn::rpc_group_address(name); -} - -DSN_API dsn_group_t dsn_group_clone(dsn_group_t g) // must be paired with release later -{ - auto grp = (::dsn::rpc_group_address *)(g); - return new ::dsn::rpc_group_address(*grp); -} - -DSN_API int dsn_group_count(dsn_group_t g) -{ - auto grp = (::dsn::rpc_group_address *)(g); - return grp->count(); -} - -DSN_API bool dsn_group_add(dsn_group_t g, dsn_address_t ep) -{ - auto grp = (::dsn::rpc_group_address *)(g); - ::dsn::rpc_address addr(ep); - return grp->add(addr); -} - -DSN_API void dsn_group_set_leader(dsn_group_t g, dsn_address_t ep) -{ - auto grp = (::dsn::rpc_group_address *)(g); - ::dsn::rpc_address addr(ep); - grp->set_leader(addr); -} - -DSN_API dsn_address_t dsn_group_get_leader(dsn_group_t g) -{ - auto grp = (::dsn::rpc_group_address *)(g); - return grp->leader().c_addr(); -} - -DSN_API bool dsn_group_is_leader(dsn_group_t g, dsn_address_t ep) -{ - auto grp = (::dsn::rpc_group_address *)(g); - return grp->leader() == ep; } - -DSN_API bool dsn_group_is_update_leader_automatically(dsn_group_t g) -{ - auto grp = (::dsn::rpc_group_address *)(g); - return grp->is_update_leader_automatically(); -} - -DSN_API void dsn_group_set_update_leader_automatically(dsn_group_t g, bool v) -{ - auto grp = (::dsn::rpc_group_address *)(g); - grp->set_update_leader_automatically(v); -} - -DSN_API dsn_address_t dsn_group_next(dsn_group_t g, dsn_address_t ep) -{ - auto grp = (::dsn::rpc_group_address *)(g); - ::dsn::rpc_address addr(ep); - return grp->next(addr).c_addr(); -} - -DSN_API dsn_address_t dsn_group_forward_leader(dsn_group_t g) -{ - auto grp = (::dsn::rpc_group_address *)(g); - grp->leader_forward(); - return grp->leader().c_addr(); -} - -DSN_API bool dsn_group_remove(dsn_group_t g, dsn_address_t ep) -{ - auto grp = (::dsn::rpc_group_address *)(g); - ::dsn::rpc_address addr(ep); - return grp->remove(addr); -} - -DSN_API void dsn_group_destroy(dsn_group_t g) -{ - auto grp = (::dsn::rpc_group_address *)(g); - delete grp; -} - -DSN_API dsn_uri_t dsn_uri_build(const char *url) // must be paired with destroy later -{ - return (dsn_uri_t) new ::dsn::rpc_uri_address(url); -} - -DSN_API dsn_uri_t dsn_uri_clone(dsn_uri_t uri) // must be paired with destroy later -{ - auto u = (::dsn::rpc_uri_address *)(uri); - return (dsn_uri_t) new ::dsn::rpc_uri_address(*u); -} - -DSN_API void dsn_uri_destroy(dsn_uri_t uri) { delete (::dsn::rpc_uri_address *)(uri); } diff --git a/src/core/core/clientlet.cpp b/src/core/core/clientlet.cpp index eb1b65eb63..3dedd805b2 100644 --- a/src/core/core/clientlet.cpp +++ b/src/core/core/clientlet.cpp @@ -93,12 +93,8 @@ void copy_remote_files_impl(::dsn::rpc_address remote, dsn_task_t native_task) { if (files.empty()) { - dsn_file_copy_remote_directory(remote.c_addr(), - source_dir.c_str(), - dest_dir.c_str(), - overwrite, - high_priority, - native_task); + dsn_file_copy_remote_directory( + remote, source_dir.c_str(), dest_dir.c_str(), overwrite, high_priority, native_task); } else { const char **ptr = (const char **)alloca(sizeof(const char *) * (files.size() + 1)); const char **ptr_base = ptr; @@ -107,7 +103,7 @@ void copy_remote_files_impl(::dsn::rpc_address remote, } *ptr = nullptr; - dsn_file_copy_remote_files(remote.c_addr(), + dsn_file_copy_remote_files(remote, source_dir.c_str(), ptr_base, dest_dir.c_str(), diff --git a/src/core/core/command_manager.cpp b/src/core/core/command_manager.cpp index 48387729ec..4f6dd562c9 100644 --- a/src/core/core/command_manager.cpp +++ b/src/core/core/command_manager.cpp @@ -160,7 +160,7 @@ bool command_manager::run_command(const std::string &cmd, rcmd.cmd = cmd; rcmd.arguments = args; ::dsn::marshall(msg, rcmd); - auto resp = dsn_rpc_call_wait(h->address.c_addr(), msg); + auto resp = dsn_rpc_call_wait(h->address, msg); if (resp != nullptr) { ::dsn::unmarshall(resp, output); return true; diff --git a/src/core/core/network.cpp b/src/core/core/network.cpp index 0bb9cad34f..21948da21f 100644 --- a/src/core/core/network.cpp +++ b/src/core/core/network.cpp @@ -506,17 +506,16 @@ uint32_t network::get_local_ipv4() "primary_interface", "", "network interface name used to init primary ipv4 " - "address, if empty, means using the first \"eth\" " - "prefixed non-loopback ipv4 address"); + "address, if empty, means using a site local address"); uint32_t ip = 0; if (strlen(explicit_host) > 0) { - ip = dsn_ipv4_from_host(explicit_host); + ip = rpc_address::ipv4_from_host(explicit_host); } if (0 == ip) { - ip = dsn_ipv4_local(inteface); + ip = rpc_address::ipv4_from_network_interface(inteface); } if (0 == ip) { @@ -524,7 +523,7 @@ uint32_t network::get_local_ipv4() if (gethostname(name, sizeof(name)) != 0) { dassert(false, "gethostname failed, err = %s", strerror(errno)); } - ip = dsn_ipv4_from_host(name); + ip = rpc_address::ipv4_from_host(name); } return ip; diff --git a/src/core/core/partition_resolver_simple.h b/src/core/core/partition_resolver_simple.h index dde79df83c..ddd305171d 100644 --- a/src/core/core/partition_resolver_simple.h +++ b/src/core/core/partition_resolver_simple.h @@ -35,8 +35,11 @@ #pragma once -#include +#include +#include #include +#include +#include namespace dsn { namespace dist { diff --git a/src/core/core/rpc_engine.cpp b/src/core/core/rpc_engine.cpp index bf688625cd..32a863ae83 100644 --- a/src/core/core/rpc_engine.cpp +++ b/src/core/core/rpc_engine.cpp @@ -45,12 +45,13 @@ #include "rpc_engine.h" #include "service_engine.h" -#include "group_address.h" -#include "uri_address.h" -#include #include +#include +#include +#include #include #include +#include #include namespace dsn { @@ -605,8 +606,8 @@ error_code rpc_engine::start(const service_app_spec &aspec, io_modifer &ctx) _uri_resolver_mgr.reset(new uri_resolver_manager()); _local_primary_address = _client_nets[NET_HDR_DSN][0]->address(); - _local_primary_address.c_addr_ptr()->u.v4.port = - aspec.ports.size() > 0 ? *aspec.ports.begin() : aspec.id + ctx.port_shift_value; + _local_primary_address.set_port(aspec.ports.size() > 0 ? *aspec.ports.begin() + : aspec.id + ctx.port_shift_value); ddebug("=== service_node=[%s], primary_address=[%s] ===", _node->full_name(), @@ -760,15 +761,14 @@ void rpc_engine::call_uri(rpc_address addr, message_ex *request, rpc_response_ta ctask->add_ref(); // released later after dsn_rpc_call // sleep 10 milliseconds before retry - tasking::enqueue( - LPC_RPC_DELAY_CALL, - nullptr, - [ server = req2->server_address.c_addr(), ctask ]() { - dsn_rpc_call(server, ctask); - ctask->release_ref(); // added when set-retry - }, - 0, - std::chrono::milliseconds(gap)); + tasking::enqueue(LPC_RPC_DELAY_CALL, + nullptr, + [ server = req2->server_address, ctask ]() { + dsn_rpc_call(server, ctask); + ctask->release_ref(); // added when set-retry + }, + 0, + std::chrono::milliseconds(gap)); return; } else { derror("service access failed (%s), no more time for further " diff --git a/src/core/core/rpc_message.cpp b/src/core/core/rpc_message.cpp index ec118dcdc3..8ca71e32ce 100644 --- a/src/core/core/rpc_message.cpp +++ b/src/core/core/rpc_message.cpp @@ -119,14 +119,14 @@ DSN_API void dsn_msg_add_ref(dsn_message_t msg) { ((::dsn::message_ex *)msg)->ad DSN_API void dsn_msg_release_ref(dsn_message_t msg) { ((::dsn::message_ex *)msg)->release_ref(); } -DSN_API dsn_address_t dsn_msg_from_address(dsn_message_t msg) +DSN_API dsn::rpc_address dsn_msg_from_address(dsn_message_t msg) { - return ((::dsn::message_ex *)msg)->header->from_address.c_addr(); + return ((::dsn::message_ex *)msg)->header->from_address; } -DSN_API dsn_address_t dsn_msg_to_address(dsn_message_t msg) +DSN_API dsn::rpc_address dsn_msg_to_address(dsn_message_t msg) { - return ((::dsn::message_ex *)msg)->to_address.c_addr(); + return ((::dsn::message_ex *)msg)->to_address; } DSN_API uint64_t dsn_msg_trace_id(dsn_message_t msg) @@ -214,6 +214,13 @@ message_ex::message_ex() message_ex::~message_ex() { + // when recv a request, message_header is hidden ahead of buffer(see@create_receive_message + // function), if you call copy_and_prepare_send() function, then a new request will be create, + // but new request share the same header with the old_request, so if old_request release header, + // then new request's header is invalid, so we don't release_buffer_header(), and there will not + // lead any problem, see@ Attention of message_header + + // release_buffer_header(); if (!_is_read) { dassert(_rw_committed, "message write is not committed"); } @@ -288,14 +295,15 @@ message_ex *message_ex::copy(bool clone_content, bool copy_for_receive) dassert(this->_rw_committed, "should not copy the message when read/write is not committed"); // ATTENTION: - // - if this message is a written message, set copied message's write pointer to the end, then - // you - // can continue to append data to the copied message. + // - if this message is a written message, set copied message's write pointer to the end, + // then you can continue to append data to the copied message. + // // - if this message is a read message, set copied message's read pointer to the beginning, // then you can read data from the beginning. + // // - if copy_for_receive is set, it means that we want to make a receiving message from a - // sending message. - // which is usually useful when you want to write mock for modules which use rpc. + // sending message. which is usually useful when you want to + // write mock for modules which use rpc. message_ex *msg = new message_ex(); msg->to_address = to_address; @@ -465,9 +473,18 @@ void message_ex::prepare_buffer_header() this->_rw_offset = (int)sizeof(message_header); this->buffers.push_back(buffer); + // here, we should call placement new, because message_header contain variable that is not + // POD-type, such as rpc_address, so we should call these variable's construct + new (ptr)(message_header); header = (message_header *)ptr; } +void message_ex::release_buffer_header() +{ + // message_header contain variable that is not POD-type, so we call these variable's destructor + header->~message_header(); +} + void message_ex::write_next(void **ptr, size_t *size, size_t min_size) { // printf("%p %s\n", this, __FUNCTION__); diff --git a/src/core/core/service_api_c.cpp b/src/core/core/service_api_c.cpp index 1515bc2fae..311180aabf 100644 --- a/src/core/core/service_api_c.cpp +++ b/src/core/core/service_api_c.cpp @@ -459,9 +459,9 @@ DSN_API bool dsn_semaphore_wait_timeout(dsn_handle_t s, int timeout_milliseconds //------------------------------------------------------------------------------ // rpc calls -DSN_API dsn_address_t dsn_primary_address() +DSN_API dsn::rpc_address dsn_primary_address() { - return ::dsn::task::get_current_rpc()->primary_address().c_addr(); + return ::dsn::task::get_current_rpc()->primary_address(); } DSN_API bool dsn_rpc_register_handler( @@ -522,7 +522,7 @@ DSN_API dsn_task_t dsn_rpc_create_response_task_ex(dsn_message_t request, return t; } -DSN_API void dsn_rpc_call(dsn_address_t server, dsn_task_t rpc_call) +DSN_API void dsn_rpc_call(dsn::rpc_address server, dsn_task_t rpc_call) { ::dsn::rpc_response_task *task = (::dsn::rpc_response_task *)rpc_call; dassert(task->spec().type == TASK_TYPE_RPC_RESPONSE, @@ -534,7 +534,7 @@ DSN_API void dsn_rpc_call(dsn_address_t server, dsn_task_t rpc_call) ::dsn::task::get_current_rpc()->call(msg, task); } -DSN_API dsn_message_t dsn_rpc_call_wait(dsn_address_t server, dsn_message_t request) +DSN_API dsn_message_t dsn_rpc_call_wait(dsn::rpc_address server, dsn_message_t request) { auto msg = ((::dsn::message_ex *)request); msg->server_address = server; @@ -554,7 +554,7 @@ DSN_API dsn_message_t dsn_rpc_call_wait(dsn_address_t server, dsn_message_t requ } } -DSN_API void dsn_rpc_call_one_way(dsn_address_t server, dsn_message_t request) +DSN_API void dsn_rpc_call_one_way(dsn::rpc_address server, dsn_message_t request) { auto msg = ((::dsn::message_ex *)request); msg->server_address = server; @@ -568,7 +568,7 @@ DSN_API void dsn_rpc_reply(dsn_message_t response, dsn::error_code err) ::dsn::task::get_current_rpc()->reply(msg, err); } -DSN_API void dsn_rpc_forward(dsn_message_t request, dsn_address_t addr) +DSN_API void dsn_rpc_forward(dsn_message_t request, dsn::rpc_address addr) { ::dsn::task::get_current_rpc()->forward((::dsn::message_ex *)(request), ::dsn::rpc_address(addr)); @@ -700,7 +700,7 @@ DSN_API void dsn_file_write_vector(dsn_handle_t file, ::dsn::task::get_current_disk()->write(callback); } -DSN_API void dsn_file_copy_remote_directory(dsn_address_t remote, +DSN_API void dsn_file_copy_remote_directory(dsn::rpc_address remote, const char *source_dir, const char *dest_dir, bool overwrite, @@ -720,7 +720,7 @@ DSN_API void dsn_file_copy_remote_directory(dsn_address_t remote, ::dsn::task::get_current_nfs()->call(rci, callback); } -DSN_API void dsn_file_copy_remote_files(dsn_address_t remote, +DSN_API void dsn_file_copy_remote_files(dsn::rpc_address remote, const char *source_dir, const char **source_files, const char *dest_dir, diff --git a/src/core/core/service_engine.cpp b/src/core/core/service_engine.cpp index 51897eaa4b..34c4328c30 100644 --- a/src/core/core/service_engine.cpp +++ b/src/core/core/service_engine.cpp @@ -37,7 +37,7 @@ #include "task_engine.h" #include "disk_engine.h" #include "rpc_engine.h" -#include "uri_address.h" +#include #include #include #include diff --git a/src/core/core/uri_address.cpp b/src/core/core/uri_address.cpp index c6d172f248..f0b6885727 100644 --- a/src/core/core/uri_address.cpp +++ b/src/core/core/uri_address.cpp @@ -33,7 +33,6 @@ * xxxx-xx-xx, author, fix bug about xxx */ -#include "uri_address.h" #include "rpc_engine.h" #include #include @@ -41,6 +40,8 @@ #include #include #include +#include +#include namespace dsn { void uri_resolver_manager::setup_resolvers() @@ -127,21 +128,18 @@ rpc_uri_address::rpc_uri_address(const char *uri) : _uri(uri) if (r1.get()) { _resolver = r1->get_app_resolver(get_uri_components().second.c_str()); } - _uri_address.assign_uri(this); } rpc_uri_address::rpc_uri_address(const rpc_uri_address &other) { _resolver = other._resolver; _uri = other._uri; - _uri_address.assign_uri(this); } rpc_uri_address &rpc_uri_address::operator=(const rpc_uri_address &other) { _resolver = other._resolver; _uri = other._uri; - _uri_address.assign_uri(this); return *this; } @@ -166,7 +164,7 @@ std::pair rpc_uri_address::get_uri_components() uri_resolver::uri_resolver(const char *name, const char *factory, const char *arguments) : _name(name), _factory(factory), _arguments(arguments) { - _meta_server.assign_group(dsn_group_build(name)); + _meta_server.assign_group(name); std::vector args; utils::split_args(arguments, args, ','); @@ -175,12 +173,12 @@ uri_resolver::uri_resolver(const char *name, const char *factory, const char *ar auto pos1 = arg.find_first_of(':'); if (pos1 != std::string::npos) { ::dsn::rpc_address ep(arg.substr(0, pos1).c_str(), atoi(arg.substr(pos1 + 1).c_str())); - dsn_group_add(_meta_server.group_handle(), ep.c_addr()); + _meta_server.group_address()->add(ep); } } } -uri_resolver::~uri_resolver() { dsn_group_destroy(_meta_server.group_handle()); } +uri_resolver::~uri_resolver() {} dist::partition_resolver_ptr uri_resolver::get_app_resolver(const char *app) { diff --git a/src/core/perf.tests/aio.cpp b/src/core/perf.tests/aio.cpp index 80efef3fa4..d82512a9ee 100644 --- a/src/core/perf.tests/aio.cpp +++ b/src/core/perf.tests/aio.cpp @@ -33,13 +33,13 @@ */ #include -#include +#include #include #include #include #include #include -#include "../core/group_address.h" +#include #include "test_utils.h" #include diff --git a/src/core/perf.tests/rpc.cpp b/src/core/perf.tests/rpc.cpp index c8d83e3bfd..b307d114db 100644 --- a/src/core/perf.tests/rpc.cpp +++ b/src/core/perf.tests/rpc.cpp @@ -32,7 +32,7 @@ * Revision history: * 2016-01-05, Tianyi Wang, first version */ -#include +#include #include #include #include "test_utils.h" diff --git a/src/core/perf.tests/task_queue.cpp b/src/core/perf.tests/task_queue.cpp index 75e466df6f..f189038199 100644 --- a/src/core/perf.tests/task_queue.cpp +++ b/src/core/perf.tests/task_queue.cpp @@ -32,12 +32,12 @@ * 2016-01-05, Tianyi Wang, first version */ -#include +#include #include #include #include #include -#include "../core/group_address.h" +#include #include #include "test_utils.h" #include @@ -203,4 +203,4 @@ TEST(core, task_queue_perf_test) external_blocking(enqueue_time / 10); self_iterating(enqueue_time); tic_tock_iterating(enqueue_time / 10); -} \ No newline at end of file +} diff --git a/src/core/perf.tests/test_utils.h b/src/core/perf.tests/test_utils.h index 9ea098db2f..992b1e7f0c 100644 --- a/src/core/perf.tests/test_utils.h +++ b/src/core/perf.tests/test_utils.h @@ -83,7 +83,7 @@ class test_client : public ::dsn::serverlet, public ::dsn::service_ dsn::rpc_address next_addr = dsn::service_app::primary_address(); if (next_addr.port() != TEST_PORT_END) { next_addr.assign_ipv4(next_addr.ip(), next_addr.port() + 1); - dsn_rpc_forward(message, next_addr.c_addr()); + dsn_rpc_forward(message, next_addr); } else { reply(message, std::string(next_addr.to_string())); } diff --git a/src/core/tests/address.cpp b/src/core/tests/address.cpp index e18497e2d1..8c444cdfe4 100644 --- a/src/core/tests/address.cpp +++ b/src/core/tests/address.cpp @@ -33,8 +33,9 @@ * xxxx-xx-xx, author, fix bug about xxx */ -#include -#include "../core/group_address.h" +#include +#include +#include #include using namespace ::dsn; @@ -49,230 +50,188 @@ static inline uint32_t host_ipv4(uint8_t sec1, uint8_t sec2, uint8_t sec3, uint8 return ip; } -static inline bool operator==(dsn_address_t l, dsn_address_t r) +static inline bool operator==(dsn::rpc_address l, dsn::rpc_address r) { - if (l.u.v4.type != r.u.v4.type) + if (l.type() != r.type()) return false; - switch (l.u.v4.type) { + switch (l.type()) { case HOST_TYPE_IPV4: - return l.u.v4.ip == r.u.v4.ip && l.u.v4.port == r.u.v4.port; + return l.ip() == r.ip() && l.port() == r.port(); case HOST_TYPE_URI: - return strcmp((const char *)(uintptr_t)l.u.uri.uri, (const char *)(uintptr_t)r.u.uri.uri) == - 0; + return strcmp(l.uri_address()->uri(), r.uri_address()->uri()) == 0; case HOST_TYPE_GROUP: - return l.u.group.group == r.u.group.group; + return l.group_address() == r.group_address(); default: return true; } } -TEST(core, dsn_ipv4_from_host) +TEST(core, rpc_address_ipv4_from_host) { // localhost --> 127.0.0.1 - ASSERT_EQ(host_ipv4(127, 0, 0, 1), dsn_ipv4_from_host("localhost")); + ASSERT_EQ(host_ipv4(127, 0, 0, 1), rpc_address::ipv4_from_host("localhost")); // 127.0.0.1 --> 127.0.0.1 - ASSERT_EQ(host_ipv4(127, 0, 0, 1), dsn_ipv4_from_host("127.0.0.1")); + ASSERT_EQ(host_ipv4(127, 0, 0, 1), rpc_address::ipv4_from_host("127.0.0.1")); } -TEST(core, dsn_ipv4_local) +TEST(core, rpc_address_ipv4_from_network_interface) { #ifndef _WIN32 - ASSERT_EQ(host_ipv4(127, 0, 0, 1), dsn_ipv4_local("lo")); - ASSERT_EQ(host_ipv4(0, 0, 0, 0), dsn_ipv4_local("not_exist_interface")); + ASSERT_EQ(host_ipv4(127, 0, 0, 1), rpc_address::ipv4_from_network_interface("lo")); + ASSERT_EQ(host_ipv4(0, 0, 0, 0), + rpc_address::ipv4_from_network_interface("not_exist_interface")); #endif } -TEST(core, dsn_address_to_string) +TEST(core, rpc_address_to_string) { { - dsn_address_t addr; - addr.u.v4.type = HOST_TYPE_IPV4; - addr.u.v4.ip = host_ipv4(127, 0, 0, 1); - addr.u.v4.port = 8080; - ASSERT_EQ(std::string("127.0.0.1:8080"), dsn_address_to_string(addr)); + dsn::rpc_address addr; + addr.assign_ipv4(host_ipv4(127, 0, 0, 1), 8080); + ASSERT_EQ(std::string("127.0.0.1:8080"), addr.to_std_string()); } { const char *uri = "http://localhost:8080/"; - dsn_address_t addr; - addr.u.uri.type = HOST_TYPE_URI; - addr.u.uri.uri = (uintptr_t)uri; - ASSERT_EQ(std::string(uri), dsn_address_to_string(addr)); + dsn::rpc_address addr; + addr.assign_uri(uri); + ASSERT_EQ(std::string(uri), addr.to_std_string()); } { const char *name = "test_group"; - dsn_group_t g = dsn_group_build(name); - dsn_address_t addr; - addr.u.group.type = HOST_TYPE_GROUP; - addr.u.group.group = (uint64_t)g; - ASSERT_EQ(std::string(name), dsn_address_to_string(addr)); - dsn_group_destroy(g); + dsn::rpc_address addr; + addr.assign_group(name); + ASSERT_EQ(std::string(name), addr.to_std_string()); } { - dsn_address_t addr; - addr.u.uri.type = HOST_TYPE_INVALID; - ASSERT_EQ(std::string("invalid address"), dsn_address_to_string(addr)); + dsn::rpc_address addr; + ASSERT_EQ(std::string("invalid address"), addr.to_std_string()); } } TEST(core, dsn_address_build) { { - dsn_address_t addr; - addr.u.v4.type = HOST_TYPE_IPV4; - addr.u.v4.ip = host_ipv4(127, 0, 0, 1); - addr.u.v4.port = 8080; - - ASSERT_EQ(addr, dsn_address_build("localhost", 8080)); - ASSERT_EQ(addr, dsn_address_build("127.0.0.1", 8080)); - ASSERT_EQ(addr, dsn_address_build_ipv4(host_ipv4(127, 0, 0, 1), 8080)); + dsn::rpc_address addr; + addr.assign_ipv4(host_ipv4(127, 0, 0, 1), 8080); + ASSERT_EQ(HOST_TYPE_IPV4, addr.type()); + ASSERT_EQ(host_ipv4(127, 0, 0, 1), addr.ip()); + ASSERT_EQ(8080, addr.port()); + + ASSERT_EQ(addr, dsn::rpc_address("localhost", 8080)); + ASSERT_EQ(addr, dsn::rpc_address("127.0.0.1", 8080)); + ASSERT_EQ(addr, dsn::rpc_address(host_ipv4(127, 0, 0, 1), 8080)); } { const char *uri = "http://localhost:8080/"; - dsn_uri_t u = dsn_uri_build(uri); - - dsn_address_t addr; - addr.u.uri.type = HOST_TYPE_URI; - addr.u.uri.uri = (uintptr_t)u; + dsn::rpc_address addr; + addr.assign_uri(uri); - ASSERT_EQ(addr, dsn_address_build_uri(u)); - dsn_uri_destroy(u); + ASSERT_EQ(addr.type(), HOST_TYPE_URI); + ASSERT_STREQ(uri, addr.uri_address()->uri()); + ASSERT_EQ(1, addr.uri_address()->get_count()); } { const char *name = "test_group"; - dsn_group_t g = dsn_group_build(name); - dsn_address_t addr; - addr.u.group.type = HOST_TYPE_GROUP; - addr.u.group.group = (uint64_t)g; - ASSERT_EQ(addr, dsn_address_build_group(g)); - dsn_group_destroy(g); + dsn::rpc_address addr; + addr.assign_group(name); + + ASSERT_EQ(HOST_TYPE_GROUP, addr.type()); + ASSERT_STREQ(name, addr.group_address()->name()); + ASSERT_EQ(1, addr.group_address()->get_count()); } } TEST(core, rpc_group_address) { - rpc_group_address g("test_group"); rpc_address addr("127.0.0.1", 8080); rpc_address invalid_addr; rpc_address addr2("127.0.0.1", 8081); - ASSERT_EQ(std::string("test_group"), g.name()); rpc_address t; - t.assign_group((dsn_group_t)(uintptr_t)&g); - ASSERT_EQ(t, g.address()); + t.assign_group("test_group"); + ASSERT_EQ(HOST_TYPE_GROUP, t.type()); + rpc_group_address *g = t.group_address(); + ASSERT_EQ(std::string("test_group"), g->name()); + ASSERT_EQ(1, g->get_count()); // { } - ASSERT_FALSE(g.remove(addr)); - ASSERT_FALSE(g.contains(addr)); - ASSERT_EQ(0u, g.members().size()); - ASSERT_EQ(invalid_addr, g.random_member()); - ASSERT_EQ(invalid_addr, g.next(addr)); - ASSERT_EQ(invalid_addr, g.leader()); - ASSERT_EQ(invalid_addr, g.possible_leader()); + ASSERT_FALSE(g->remove(addr)); + ASSERT_FALSE(g->contains(addr)); + ASSERT_EQ(0u, g->members().size()); + ASSERT_EQ(invalid_addr, g->random_member()); + ASSERT_EQ(invalid_addr, g->next(addr)); + ASSERT_EQ(invalid_addr, g->leader()); + ASSERT_EQ(invalid_addr, g->possible_leader()); // { addr } - ASSERT_TRUE(g.add(addr)); - ASSERT_FALSE(g.add(addr)); - ASSERT_TRUE(g.contains(addr)); - ASSERT_EQ(1u, g.members().size()); - ASSERT_EQ(addr, g.members().at(0)); - ASSERT_EQ(addr, g.random_member()); - ASSERT_EQ(addr, g.next(addr)); - ASSERT_EQ(addr, g.next(invalid_addr)); - ASSERT_EQ(addr, g.next(addr2)); - ASSERT_EQ(invalid_addr, g.leader()); - ASSERT_EQ(addr, g.possible_leader()); + ASSERT_TRUE(g->add(addr)); + ASSERT_FALSE(g->add(addr)); + ASSERT_TRUE(g->contains(addr)); + ASSERT_EQ(1u, g->members().size()); + ASSERT_EQ(addr, g->members().at(0)); + ASSERT_EQ(addr, g->random_member()); + ASSERT_EQ(addr, g->next(addr)); + ASSERT_EQ(addr, g->next(invalid_addr)); + ASSERT_EQ(addr, g->next(addr2)); + ASSERT_EQ(invalid_addr, g->leader()); + ASSERT_EQ(addr, g->possible_leader()); // { addr* } - g.set_leader(addr); - ASSERT_TRUE(g.contains(addr)); - ASSERT_EQ(1u, g.members().size()); - ASSERT_EQ(addr, g.members().at(0)); - ASSERT_EQ(addr, g.leader()); - ASSERT_EQ(addr, g.possible_leader()); + g->set_leader(addr); + ASSERT_TRUE(g->contains(addr)); + ASSERT_EQ(1u, g->members().size()); + ASSERT_EQ(addr, g->members().at(0)); + ASSERT_EQ(addr, g->leader()); + ASSERT_EQ(addr, g->possible_leader()); // { addr, addr2* } - g.set_leader(addr2); - ASSERT_TRUE(g.contains(addr)); - ASSERT_TRUE(g.contains(addr2)); - ASSERT_EQ(2u, g.members().size()); - ASSERT_EQ(addr, g.members().at(0)); - ASSERT_EQ(addr2, g.members().at(1)); - ASSERT_EQ(addr2, g.leader()); - ASSERT_EQ(addr2, g.possible_leader()); - ASSERT_EQ(addr, g.next(addr2)); - ASSERT_EQ(addr2, g.next(addr)); + g->set_leader(addr2); + ASSERT_TRUE(g->contains(addr)); + ASSERT_TRUE(g->contains(addr2)); + ASSERT_EQ(2u, g->members().size()); + ASSERT_EQ(addr, g->members().at(0)); + ASSERT_EQ(addr2, g->members().at(1)); + ASSERT_EQ(addr2, g->leader()); + ASSERT_EQ(addr2, g->possible_leader()); + ASSERT_EQ(addr, g->next(addr2)); + ASSERT_EQ(addr2, g->next(addr)); // { addr, addr2 } - g.set_leader(invalid_addr); - ASSERT_TRUE(g.contains(addr)); - ASSERT_TRUE(g.contains(addr2)); - ASSERT_EQ(2u, g.members().size()); - ASSERT_EQ(addr, g.members().at(0)); - ASSERT_EQ(addr2, g.members().at(1)); - ASSERT_EQ(invalid_addr, g.leader()); + g->set_leader(invalid_addr); + ASSERT_TRUE(g->contains(addr)); + ASSERT_TRUE(g->contains(addr2)); + ASSERT_EQ(2u, g->members().size()); + ASSERT_EQ(addr, g->members().at(0)); + ASSERT_EQ(addr2, g->members().at(1)); + ASSERT_EQ(invalid_addr, g->leader()); // { addr*, addr2 } - g.set_leader(addr); - ASSERT_TRUE(g.contains(addr)); - ASSERT_TRUE(g.contains(addr2)); - ASSERT_EQ(2u, g.members().size()); - ASSERT_EQ(addr, g.members().at(0)); - ASSERT_EQ(addr2, g.members().at(1)); - ASSERT_EQ(addr, g.leader()); + g->set_leader(addr); + ASSERT_TRUE(g->contains(addr)); + ASSERT_TRUE(g->contains(addr2)); + ASSERT_EQ(2u, g->members().size()); + ASSERT_EQ(addr, g->members().at(0)); + ASSERT_EQ(addr2, g->members().at(1)); + ASSERT_EQ(addr, g->leader()); // { uri_addr } - ASSERT_TRUE(g.remove(addr)); - ASSERT_FALSE(g.contains(addr)); - ASSERT_TRUE(g.contains(addr2)); - ASSERT_EQ(1u, g.members().size()); - ASSERT_EQ(addr2, g.members().at(0)); - ASSERT_EQ(invalid_addr, g.leader()); + ASSERT_TRUE(g->remove(addr)); + ASSERT_FALSE(g->contains(addr)); + ASSERT_TRUE(g->contains(addr2)); + ASSERT_EQ(1u, g->members().size()); + ASSERT_EQ(addr2, g->members().at(0)); + ASSERT_EQ(invalid_addr, g->leader()); // { } - ASSERT_TRUE(g.remove(addr2)); - ASSERT_FALSE(g.contains(addr2)); - ASSERT_EQ(0u, g.members().size()); - ASSERT_EQ(invalid_addr, g.leader()); -} - -TEST(core, dsn_group) -{ - dsn_group_t g = dsn_group_build("test_group"); - rpc_address addr("127.0.0.1", 8080); - rpc_address invalid_addr; - rpc_address addr2("127.0.0.1", 8081); - - // { } - ASSERT_EQ(invalid_addr.c_addr(), dsn_group_get_leader(g)); - ASSERT_EQ(invalid_addr.c_addr(), dsn_group_next(g, addr.c_addr())); - - // { addr } - ASSERT_TRUE(dsn_group_add(g, addr.c_addr())); - ASSERT_FALSE(dsn_group_add(g, addr.c_addr())); - ASSERT_EQ(invalid_addr.c_addr(), dsn_group_get_leader(g)); - ASSERT_EQ(addr.c_addr(), dsn_group_next(g, addr.c_addr())); - - // { addr* } - dsn_group_set_leader(g, addr.c_addr()); - ASSERT_EQ(addr.c_addr(), dsn_group_get_leader(g)); - ASSERT_TRUE(dsn_group_is_leader(g, addr.c_addr())); - - // { addr*, addr2 } - ASSERT_TRUE(dsn_group_add(g, addr2.c_addr())); - ASSERT_EQ(addr2.c_addr(), dsn_group_next(g, addr.c_addr())); - ASSERT_EQ(addr.c_addr(), dsn_group_next(g, addr2.c_addr())); - - // { addr2 } - ASSERT_TRUE(dsn_group_remove(g, addr.c_addr())); - ASSERT_EQ(invalid_addr.c_addr(), dsn_group_get_leader(g)); - ASSERT_EQ(addr2.c_addr(), dsn_group_next(g, addr.c_addr())); - - dsn_group_destroy(g); + ASSERT_TRUE(g->remove(addr2)); + ASSERT_FALSE(g->contains(addr2)); + ASSERT_EQ(0u, g->members().size()); + ASSERT_EQ(invalid_addr, g->leader()); } diff --git a/src/core/tests/hpc_io_looper.cpp b/src/core/tests/hpc_io_looper.cpp index b885b3ffc9..ed48e2ef72 100644 --- a/src/core/tests/hpc_io_looper.cpp +++ b/src/core/tests/hpc_io_looper.cpp @@ -44,7 +44,7 @@ #include #include #include -#include "../core/group_address.h" +#include #include "../core/service_engine.h" #include "test_utils.h" #include "../tools/hpc/io_looper.h" diff --git a/src/core/tests/hpc_tail_logger.cpp b/src/core/tests/hpc_tail_logger.cpp index e1a909018d..3f50bad07a 100644 --- a/src/core/tests/hpc_tail_logger.cpp +++ b/src/core/tests/hpc_tail_logger.cpp @@ -44,7 +44,7 @@ #include #include -#include "../core/group_address.h" +#include #include "test_utils.h" #include "../core/service_engine.h" #include "../tools/hpc/hpc_tail_logger.h" diff --git a/src/core/tests/rpc.cpp b/src/core/tests/rpc.cpp index 361088c91e..862db219b9 100644 --- a/src/core/tests/rpc.cpp +++ b/src/core/tests/rpc.cpp @@ -37,31 +37,31 @@ #include #include -#include +#include #include + #include + #include -#include "../core/group_address.h" +#include +#include #include "test_utils.h" -#include typedef std::function rpc_reply_handler; -static ::dsn::rpc_address build_group() +static dsn::rpc_address build_group() { ::dsn::rpc_address server_group; - server_group.assign_group(dsn_group_build("server_group.test")); + server_group.assign_group("server_group.test"); + dsn::rpc_group_address *g = server_group.group_address(); for (uint16_t p = TEST_PORT_BEGIN; p <= TEST_PORT_END; ++p) { - dsn_group_add(server_group.group_handle(), ::dsn::rpc_address("localhost", p).c_addr()); + g->add(dsn::rpc_address("localhost", p)); } - dsn_group_set_leader(server_group.group_handle(), - ::dsn::rpc_address("localhost", TEST_PORT_BEGIN).c_addr()); + g->set_leader(dsn::rpc_address("localhost", TEST_PORT_BEGIN)); return server_group; } -static void destroy_group(::dsn::rpc_address group) { dsn_group_destroy(group.group_handle()); } - static ::dsn::rpc_address dsn_address_from_string(const std::string &str) { size_t pos = str.find(":"); @@ -94,7 +94,7 @@ TEST(core, group_address_talk_to_others) auto typed_callback = [addr](error_code err_code, const std::string &result) { EXPECT_EQ(ERR_OK, err_code); - ::dsn::rpc_address addr_got; + dsn::rpc_address addr_got; ddebug("talk to others callback, result: %s", result.c_str()); EXPECT_TRUE(addr_got.from_string_ipv4(result.c_str())); EXPECT_EQ(TEST_PORT_END, addr_got.port()); @@ -106,7 +106,6 @@ TEST(core, group_address_talk_to_others) nullptr, typed_callback); resp->wait(); - destroy_group(addr); } TEST(core, group_address_change_leader) @@ -128,8 +127,7 @@ TEST(core, group_address_change_leader) // not update leader on forwarding addr.group_address()->set_update_leader_automatically(false); - dsn_group_set_leader(addr.group_handle(), - ::dsn::rpc_address("localhost", TEST_PORT_BEGIN).c_addr()); + addr.group_address()->set_leader(dsn::rpc_address("localhost", TEST_PORT_BEGIN)); resp_task = ::dsn::rpc::call(addr, RPC_TEST_STRING_COMMAND, std::string("expect_talk_to_others"), @@ -137,26 +135,23 @@ TEST(core, group_address_change_leader) typed_callback); resp_task->wait(); if (rpc_err == ERR_OK) { - EXPECT_EQ(::dsn::rpc_address("localhost", TEST_PORT_BEGIN), - ::dsn::rpc_address(dsn_group_get_leader(addr.group_handle()))); + EXPECT_EQ(dsn::rpc_address("localhost", TEST_PORT_BEGIN), + dsn::rpc_address(addr.group_address()->leader())); } + // update leader on forwarding addr.group_address()->set_update_leader_automatically(true); - dsn_group_set_leader(addr.group_handle(), - ::dsn::rpc_address("localhost", TEST_PORT_BEGIN).c_addr()); - resp_task = ::dsn::rpc::call(addr, - RPC_TEST_STRING_COMMAND, - std::string("expect_talk_to_others"), - nullptr, - typed_callback); + addr.group_address()->set_leader(dsn::rpc_address("localhost", TEST_PORT_BEGIN)); + resp_task = dsn::rpc::call(addr, + RPC_TEST_STRING_COMMAND, + std::string("expect_talk_to_others"), + nullptr, + typed_callback); resp_task->wait(); - ddebug("addr.leader=%s", - ::dsn::rpc_address(dsn_group_get_leader(addr.group_handle())).to_string()); + ddebug("addr.leader=%s", addr.group_address()->leader().to_string()); if (rpc_err == ERR_OK) { - EXPECT_EQ(TEST_PORT_END, - ::dsn::rpc_address(dsn_group_get_leader(addr.group_handle())).port()); + EXPECT_EQ(TEST_PORT_END, addr.group_address()->leader().port()); } - destroy_group(addr); } typedef ::dsn::utils::priority_queue<::dsn::task_ptr, 1> task_resp_queue; @@ -173,7 +168,7 @@ static void rpc_group_callback(error_code err, action_on_failure(err, req, resp); dsn::rpc_address group_addr = ((dsn::message_ex *)req)->server_address; - dsn_group_forward_leader(group_addr.group_handle()); + group_addr.group_address()->leader_forward(); auto req_again = dsn_msg_copy(req, false, false); auto call_again = @@ -236,15 +231,13 @@ TEST(core, group_address_no_response_2) }; send_message(addr, std::string("expect_no_reply"), 1, action_on_succeed, action_on_failure); - destroy_group(addr); } TEST(core, send_to_invalid_address) { ::dsn::rpc_address group = build_group(); /* here we assume 10.255.254.253:32766 is not assigned */ - dsn_group_set_leader(group.group_handle(), - ::dsn::rpc_address("10.255.254.253", 32766).c_addr()); + group.group_address()->set_leader(dsn::rpc_address("10.255.254.253", 32766)); rpc_reply_handler action_on_succeed = [](error_code err, dsn_message_t, dsn_message_t resp) { EXPECT_TRUE(err == ERR_OK); @@ -257,5 +250,4 @@ TEST(core, send_to_invalid_address) }; send_message(group, std::string("echo hehehe"), 1, action_on_succeed, action_on_failure); - destroy_group(group); } diff --git a/src/core/tests/service_api_c.cpp b/src/core/tests/service_api_c.cpp index 4550944118..4ea922ad1b 100644 --- a/src/core/tests/service_api_c.cpp +++ b/src/core/tests/service_api_c.cpp @@ -336,7 +336,7 @@ TEST(core, dsn_nfs) dsn_task_add_ref(t); ASSERT_NE(nullptr, t); dsn_file_copy_remote_files( - dsn_address_build("localhost", 20101), ".", files, "nfs_test_dir", false, false, t); + dsn::rpc_address("localhost", 20101), ".", files, "nfs_test_dir", false, false, t); ASSERT_TRUE(dsn_task_wait_timeout(t, 20000)); ASSERT_EQ(r.err, dsn_task_error(t)); ASSERT_EQ(ERR_OK, r.err); @@ -377,7 +377,7 @@ TEST(core, dsn_nfs) dsn_task_add_ref(t); ASSERT_NE(nullptr, t); dsn_file_copy_remote_files( - dsn_address_build("localhost", 20101), ".", files, "nfs_test_dir", true, false, t); + dsn::rpc_address("localhost", 20101), ".", files, "nfs_test_dir", true, false, t); ASSERT_TRUE(dsn_task_wait_timeout(t, 20000)); ASSERT_EQ(r.err, dsn_task_error(t)); ASSERT_EQ(ERR_OK, r.err); @@ -403,7 +403,7 @@ TEST(core, dsn_nfs) 0); dsn_task_add_ref(t); ASSERT_NE(nullptr, t); - dsn_file_copy_remote_directory(dsn_address_build("localhost", 20101), + dsn_file_copy_remote_directory(dsn::rpc_address("localhost", 20101), "nfs_test_dir", "nfs_test_dir_copy", false, diff --git a/src/core/tests/test_utils.h b/src/core/tests/test_utils.h index 3f2a9b0e70..708f6f9e87 100644 --- a/src/core/tests/test_utils.h +++ b/src/core/tests/test_utils.h @@ -90,10 +90,10 @@ class test_client : public ::dsn::serverlet, public ::dsn::service_ dsn::rpc_address next_addr = dsn::service_app::primary_address(); if (next_addr.port() != TEST_PORT_END) { next_addr.assign_ipv4(next_addr.ip(), next_addr.port() + 1); - ddebug("test_client_server, talk_to_others: %s", next_addr.to_std_string().c_str()); - dsn_rpc_forward(message, next_addr.c_addr()); + ddebug("test_client_server, talk_to_others: %s", next_addr.to_string()); + dsn_rpc_forward(message, next_addr); } else { - ddebug("test_client_server, talk_to_me: %s", next_addr.to_std_string().c_str()); + ddebug("test_client_server, talk_to_me: %s", next_addr.to_string()); reply(message, next_addr.to_std_string()); } } else if (command == "expect_no_reply") { diff --git a/src/core/tools/common/dsn_message_parser.cpp b/src/core/tools/common/dsn_message_parser.cpp index 853bbe3d7b..98c50a5ac9 100644 --- a/src/core/tools/common/dsn_message_parser.cpp +++ b/src/core/tools/common/dsn_message_parser.cpp @@ -86,13 +86,11 @@ message_ex *dsn_message_parser::get_message_on_receive(message_reader *reader, msg->hdr_format = NET_HDR_DSN; return msg; } - } else // buf_len < msg_sz - { + } else { // buf_len < msg_sz read_next = msg_sz - buf_len; return nullptr; } - } else // buf_len < sizeof(message_header) - { + } else { // buf_len < sizeof(message_header) read_next = sizeof(message_header) - buf_len; return nullptr; } diff --git a/src/core/tools/common/explorer.cpp b/src/core/tools/common/explorer.cpp index 6ba0419782..76700a257b 100644 --- a/src/core/tools/common/explorer.cpp +++ b/src/core/tools/common/explorer.cpp @@ -56,7 +56,7 @@ class task_explorer { ++_msg_count; utils::auto_lock l(_lock); - ++_ins[msg->header->from_address.c_addr().u.value][caller]; + ++_ins[msg->header->from_address.value()][caller]; } void on_local_call(dsn::task_code callee) @@ -252,8 +252,8 @@ class per_node_task_explorer // with external partners else { - dsn_address_t remote; - remote.u.value = kv.first; + dsn::rpc_address remote; + remote.value() = kv.first; ss << "\t" << kv.first << " [label=\"" << rpc_address(remote).to_std_string() << "\"];" << std::endl; @@ -290,7 +290,7 @@ class all_task_explorer : public utils::singleton service_app *se = all[i]; auto &exp = _explorers[se->info().entity_id]; exp.set_id(se->info().entity_id, se->primary_address(), se->info().full_name); - _explorers_by_addr[exp.address().c_addr().u.value] = &exp; + _explorers_by_addr[exp.address().value()] = &exp; } } @@ -327,7 +327,7 @@ class all_task_explorer : public utils::singleton for (auto &exp : _explorers) { std::unordered_set ots; std::unordered_set *ptr = &ots; - auto it = outgoing_tasks.find(exp.address().c_addr().u.value); + auto it = outgoing_tasks.find(exp.address().value()); if (it != outgoing_tasks.end()) { ptr = &it->second; } diff --git a/src/dist/failure_detector_multimaster/failure_detector_multimaster.cpp b/src/dist/failure_detector_multimaster/failure_detector_multimaster.cpp index a576674c11..950dc0e47d 100644 --- a/src/dist/failure_detector_multimaster/failure_detector_multimaster.cpp +++ b/src/dist/failure_detector_multimaster/failure_detector_multimaster.cpp @@ -33,6 +33,8 @@ * xxxx-xx-xx, author, fix bug about xxx */ +#include +#include #include namespace dsn { @@ -43,29 +45,27 @@ slave_failure_detector_with_multimaster::slave_failure_detector_with_multimaster std::function &&master_disconnected_callback, std::function &&master_connected_callback) { - _meta_servers.assign_group(dsn_group_build("meta-servers")); + _meta_servers.assign_group("meta-servers"); for (auto &s : meta_servers) { - dsn_group_add(_meta_servers.group_handle(), s.c_addr()); + _meta_servers.group_address()->add(s); } - dsn_group_set_leader(_meta_servers.group_handle(), - meta_servers[random32(0, (uint32_t)meta_servers.size() - 1)].c_addr()); + _meta_servers.group_address()->set_leader( + meta_servers[random32(0, (uint32_t)meta_servers.size() - 1)]); + // ATTENTION: here we disable dsn_group_set_update_leader_automatically to avoid // failure detecting logic is affected by rpc failure or rpc forwarding. - dsn_group_set_update_leader_automatically(_meta_servers.group_handle(), false); + _meta_servers.group_address()->set_update_leader_automatically(false); _master_disconnected_callback = std::move(master_disconnected_callback); _master_connected_callback = std::move(master_connected_callback); } -slave_failure_detector_with_multimaster::~slave_failure_detector_with_multimaster(void) -{ - dsn_group_destroy(_meta_servers.group_handle()); -} +slave_failure_detector_with_multimaster::~slave_failure_detector_with_multimaster(void) {} void slave_failure_detector_with_multimaster::set_leader_for_test(rpc_address meta) { - dsn_group_set_leader(_meta_servers.group_handle(), meta.c_addr()); + _meta_servers.group_address()->set_leader(meta); } void slave_failure_detector_with_multimaster::end_ping(::dsn::error_code err, @@ -85,15 +85,15 @@ void slave_failure_detector_with_multimaster::end_ping(::dsn::error_code err, if (!failure_detector::end_ping_internal(err, ack)) return; - dassert(ack.this_node == dsn_group_get_leader(_meta_servers.group_handle()), + dassert(ack.this_node == _meta_servers.group_address()->leader(), "ack.this_node[%s] vs meta_servers.leader[%s]", ack.this_node.to_string(), - dsn_address_to_string(dsn_group_get_leader(_meta_servers.group_handle()))); + _meta_servers.group_address()->leader().to_string()); if (ERR_OK != err) { - rpc_address next = dsn_group_next(_meta_servers.group_handle(), ack.this_node.c_addr()); + rpc_address next = _meta_servers.group_address()->next(ack.this_node); if (next != ack.this_node) { - dsn_group_set_leader(_meta_servers.group_handle(), next.c_addr()); + _meta_servers.group_address()->set_leader(next); // do not start next send_beacon() immediately to avoid send rpc too frequently switch_master(ack.this_node, next, 1000); } @@ -101,14 +101,14 @@ void slave_failure_detector_with_multimaster::end_ping(::dsn::error_code err, if (ack.is_master) { // do nothing } else if (ack.primary_node.is_invalid()) { - rpc_address next = dsn_group_next(_meta_servers.group_handle(), ack.this_node.c_addr()); + rpc_address next = _meta_servers.group_address()->next(ack.this_node); if (next != ack.this_node) { - dsn_group_set_leader(_meta_servers.group_handle(), next.c_addr()); + _meta_servers.group_address()->set_leader(next); // do not start next send_beacon() immediately to avoid send rpc too frequently switch_master(ack.this_node, next, 1000); } } else { - dsn_group_set_leader(_meta_servers.group_handle(), ack.primary_node.c_addr()); + _meta_servers.group_address()->set_leader(ack.primary_node); // start next send_beacon() immediately because the leader is possibly right. switch_master(ack.this_node, ack.primary_node, 0); } @@ -120,7 +120,7 @@ void slave_failure_detector_with_multimaster::on_master_disconnected( const std::vector<::dsn::rpc_address> &nodes) { bool primary_disconnected = false; - rpc_address leader = dsn_group_get_leader(_meta_servers.group_handle()); + rpc_address leader = _meta_servers.group_address()->leader(); for (auto it = nodes.begin(); it != nodes.end(); ++it) { if (leader == *it) primary_disconnected = true; @@ -137,11 +137,7 @@ void slave_failure_detector_with_multimaster::on_master_connected(::dsn::rpc_add * well, this is called in on_ping_internal, which is called by rep::end_ping. * So this function is called in the lock context of fd::_lock */ - bool is_primary = false; - { - is_primary = dsn_group_is_leader(_meta_servers.group_handle(), node.c_addr()); - } - + bool is_primary = (_meta_servers.group_address()->leader() == node); if (is_primary) { _master_connected_callback(); } diff --git a/src/dist/replication/client_lib/fs_manager.cpp b/src/dist/replication/client_lib/fs_manager.cpp index dcb332cb2b..1dcbe83899 100644 --- a/src/dist/replication/client_lib/fs_manager.cpp +++ b/src/dist/replication/client_lib/fs_manager.cpp @@ -149,7 +149,7 @@ dsn::error_code fs_manager::initialize(const std::vector &data_dirs dir_node *n = new dir_node(tags[i], norm_path); _dir_nodes.emplace_back(n); ddebug("%s: mark data dir(%s) as tag(%s)", - dsn_address_to_string(dsn_primary_address()), + dsn_primary_address().to_string(), norm_path.c_str(), tags[i].c_str()); } @@ -176,7 +176,7 @@ void fs_manager::add_replica(const gpid &pid, const std::string &pid_dir) dir_node *n = get_dir_node(pid_dir); if (nullptr == n) { derror("%s: dir(%s) of gpid(%d.%d) haven't registered", - dsn_address_to_string(dsn_primary_address()), + dsn_primary_address().to_string(), pid_dir.c_str(), pid.get_app_id(), pid.get_partition_index()); @@ -186,13 +186,13 @@ void fs_manager::add_replica(const gpid &pid, const std::string &pid_dir) auto result = replicas_for_app.emplace(pid); if (!result.second) { dwarn("%s: gpid(%d.%d) already in the dir_node(%s)", - dsn_address_to_string(dsn_primary_address()), + dsn_primary_address().to_string(), pid.get_app_id(), pid.get_partition_index(), n->tag.c_str()); } else { ddebug("%s: add gpid(%d.%d) to dir_node(%s)", - dsn_address_to_string(dsn_primary_address()), + dsn_primary_address().to_string(), pid.get_app_id(), pid.get_partition_index(), n->tag.c_str()); @@ -234,7 +234,7 @@ void fs_manager::allocate_dir(const gpid &pid, const std::string &type, /*out*/ ddebug( "%s: put pid(%d.%d) to dir(%s), which has %u replicas of current app, %u replicas totally", - dsn_address_to_string(dsn_primary_address()), + dsn_primary_address().to_string(), pid.get_app_id(), pid.get_partition_index(), selected->tag.c_str(), @@ -258,7 +258,7 @@ void fs_manager::remove_replica(const gpid &pid) n->tag.c_str()); if (r != 0) { ddebug("%s: remove gpid(%d.%d) from dir(%s)", - dsn_address_to_string(dsn_primary_address()), + dsn_primary_address().to_string(), pid.get_app_id(), pid.get_partition_index(), n->tag.c_str()); diff --git a/src/dist/replication/ddl_lib/replication_ddl_client.cpp b/src/dist/replication/ddl_lib/replication_ddl_client.cpp index b50d97d259..26055a8126 100644 --- a/src/dist/replication/ddl_lib/replication_ddl_client.cpp +++ b/src/dist/replication/ddl_lib/replication_ddl_client.cpp @@ -33,6 +33,8 @@ */ #include #include + +#include #include #include #include @@ -143,16 +145,13 @@ std::string replication_ddl_client::list_hostname_from_ip_port(const char *ip_po replication_ddl_client::replication_ddl_client(const std::vector &meta_servers) { - _meta_server.assign_group(dsn_group_build("meta-servers")); + _meta_server.assign_group("meta-servers"); for (auto &m : meta_servers) { - dsn_group_add(_meta_server.group_handle(), m.c_addr()); + _meta_server.group_address()->add(m); } } -replication_ddl_client::~replication_ddl_client() -{ - dsn_group_destroy(_meta_server.group_handle()); -} +replication_ddl_client::~replication_ddl_client() {} dsn::error_code replication_ddl_client::wait_app_ready(const std::string &app_name, int partition_count, diff --git a/src/dist/replication/lib/replica.cpp b/src/dist/replication/lib/replica.cpp index fc064d08fd..ea9a989814 100644 --- a/src/dist/replication/lib/replica.cpp +++ b/src/dist/replication/lib/replica.cpp @@ -177,7 +177,7 @@ void replica::response_client_message(bool is_read, dsn_message_t request, error "%s: reply client %s to %s, err = %s", name(), is_read ? "read" : "write", - dsn_address_to_string(dsn_msg_from_address(request)), + dsn_msg_from_address(request).to_string(), error.to_string()); dsn_rpc_reply(dsn_msg_create_response(request), error); diff --git a/src/dist/replication/lib/replica_2pc.cpp b/src/dist/replication/lib/replica_2pc.cpp index 71c6521592..1106be6101 100644 --- a/src/dist/replication/lib/replica_2pc.cpp +++ b/src/dist/replication/lib/replica_2pc.cpp @@ -69,7 +69,7 @@ void replica::on_client_write(task_code code, dsn_message_t request) dinfo("%s: got write request from %s", name(), - dsn_address_to_string(dsn_msg_from_address(request))); + dsn_msg_from_address(request).to_string()); auto mu = _primary_states.write_queue.add_work(code, request, this); if (mu) { init_prepare(mu, false); diff --git a/src/dist/replication/lib/replica_config.cpp b/src/dist/replication/lib/replica_config.cpp index 8b09a9ab52..a1c4bc0e79 100644 --- a/src/dist/replication/lib/replica_config.cpp +++ b/src/dist/replication/lib/replica_config.cpp @@ -459,8 +459,7 @@ void replica::on_update_configuration_on_meta_server_reply( err, request, response, std::move(req2)); }, get_gpid().thread_hash()); - dsn_rpc_call(target.c_addr(), - _primary_states.reconfiguration_task->native_handle()); + dsn_rpc_call(target, _primary_states.reconfiguration_task->native_handle()); dsn_msg_release_ref(request); }, get_gpid().thread_hash(), diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index 918ff7def7..9ba448e63b 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -1312,7 +1312,7 @@ void replica_stub::response_client_error(gpid gpid, gpid.get_partition_index(), _primary_address.to_string(), is_read ? "read" : "write", - dsn_address_to_string(dsn_msg_from_address(request)), + dsn_msg_from_address(request).to_string(), error.to_string()); } else { derror("%d.%d@%s: reply client %s to %s, err = %s", @@ -1320,7 +1320,7 @@ void replica_stub::response_client_error(gpid gpid, gpid.get_partition_index(), _primary_address.to_string(), is_read ? "read" : "write", - dsn_address_to_string(dsn_msg_from_address(request)), + dsn_msg_from_address(request).to_string(), error.to_string()); } dsn_rpc_reply(dsn_msg_create_response(request), error); diff --git a/src/dist/replication/meta_server/meta_service.cpp b/src/dist/replication/meta_server/meta_service.cpp index a68a57581d..5facec851f 100644 --- a/src/dist/replication/meta_server/meta_service.cpp +++ b/src/dist/replication/meta_server/meta_service.cpp @@ -323,7 +323,7 @@ int meta_service::check_leader(dsn_message_t req) dinfo("leader address: %s", leader.to_string()); if (!leader.is_invalid()) { - dsn_rpc_forward(req, leader.c_addr()); + dsn_rpc_forward(req, leader); return 0; } else { return -1; diff --git a/src/dist/replication/meta_server/meta_service.h b/src/dist/replication/meta_server/meta_service.h index 11014eb090..caa1ac27fe 100644 --- a/src/dist/replication/meta_server/meta_service.h +++ b/src/dist/replication/meta_server/meta_service.h @@ -95,11 +95,11 @@ class meta_service : public serverlet virtual void reply_message(dsn_message_t, dsn_message_t response) { dsn_rpc_reply(response); } virtual void send_message(const rpc_address &target, dsn_message_t request) { - dsn_rpc_call_one_way(target.c_addr(), request); + dsn_rpc_call_one_way(target, request); } virtual void send_request(dsn_message_t /*req*/, const rpc_address &target, task_ptr callback) { - dsn_rpc_call(target.c_addr(), callback->native_handle()); + dsn_rpc_call(target, callback->native_handle()); } // these two callbacks are running in fd's thread_pool, and in fd's lock diff --git a/src/dist/replication/test/simple_kv/client.cpp b/src/dist/replication/test/simple_kv/client.cpp index 12d7efff6c..984f49dd80 100644 --- a/src/dist/replication/test/simple_kv/client.cpp +++ b/src/dist/replication/test/simple_kv/client.cpp @@ -35,6 +35,7 @@ #include "client.h" #include "case.h" +#include #include #include @@ -48,13 +49,7 @@ simple_kv_client_app::simple_kv_client_app(const service_app_info *info) { } -simple_kv_client_app::~simple_kv_client_app() -{ - stop(); - if (_meta_server_group.group_handle()) { - dsn_group_destroy(_meta_server_group.group_handle()); - } -} +simple_kv_client_app::~simple_kv_client_app() { stop(); } ::dsn::error_code simple_kv_client_app::start(const std::vector &args) { @@ -64,13 +59,14 @@ ::dsn::error_code simple_kv_client_app::start(const std::vector &ar std::vector meta_servers; replica_helper::load_meta_servers(meta_servers); - _meta_server_group.assign_group(dsn_group_build("meta-servers")); + _meta_server_group.assign_group("meta_servers"); + rpc_group_address *g = _meta_server_group.group_address(); for (auto &ms : meta_servers) { - dsn_group_add(_meta_server_group.group_handle(), ms.c_addr()); + g->add(ms); } // argv[1]: e.g., dsn://mycluster/simple-kv.instance0 - _service_addr = url_host_address(args[1].c_str()); + _service_addr.assign_uri(args[1].c_str()); _simple_kv_client.reset(new simple_kv_client(_service_addr)); dsn::tasking::enqueue(LPC_SIMPLE_KV_TEST, this, std::bind(&simple_kv_client_app::run, this)); @@ -155,7 +151,7 @@ void simple_kv_client_app::send_config_to_meta(const rpc_address &receiver, dsn::marshall(req, request); - dsn_rpc_call_one_way(_meta_server_group.c_addr(), req); + dsn_rpc_call_one_way(_meta_server_group, req); } struct read_context diff --git a/src/dist/replication/test/simple_kv/client.h b/src/dist/replication/test/simple_kv/client.h index 7cc6de288d..9b818f8143 100644 --- a/src/dist/replication/test/simple_kv/client.h +++ b/src/dist/replication/test/simple_kv/client.h @@ -61,7 +61,7 @@ class simple_kv_client_app : public ::dsn::service_app, public virtual ::dsn::cl private: std::unique_ptr _simple_kv_client; rpc_address _meta_server_group; - url_host_address _service_addr; + rpc_address _service_addr; }; } } diff --git a/src/tests/dsn/failure_detector.cpp b/src/tests/dsn/failure_detector.cpp index 7e91df44f6..c234d23162 100644 --- a/src/tests/dsn/failure_detector.cpp +++ b/src/tests/dsn/failure_detector.cpp @@ -256,7 +256,7 @@ void worker_set_leader(test_worker *worker, int leader_contact) void clear(test_worker *worker, std::vector masters) { - rpc_address leader = dsn_group_get_leader(worker->fd()->get_servers().group_handle()); + rpc_address leader = worker->fd()->get_servers().group_address()->leader(); config_master_message msg; msg.master = leader; From 82e5a89a123baf19e4f1f85d6a646ca0e8969a98 Mon Sep 17 00:00:00 2001 From: WeijieSun Date: Tue, 10 Apr 2018 13:39:04 +0800 Subject: [PATCH 2/7] rpc_address: small code refactor remove redudant clear function, rename sInvalid to s_invalid_address --- include/dsn/tool-api/group_address.h | 8 ++--- include/dsn/tool-api/rpc_address.h | 45 +++++++++++++--------------- src/core/core/address.cpp | 15 +++++----- 3 files changed, 33 insertions(+), 35 deletions(-) diff --git a/include/dsn/tool-api/group_address.h b/include/dsn/tool-api/group_address.h index 715d2809a9..8bc61fcaee 100644 --- a/include/dsn/tool-api/group_address.h +++ b/include/dsn/tool-api/group_address.h @@ -59,14 +59,14 @@ class rpc_group_address : public dsn::ref_counter rpc_address random_member() const { alr_t l(_lock); - return _members.empty() ? rpc_address::sInvalid + return _members.empty() ? rpc_address::s_invalid_address : _members[dsn_random32(0, (uint32_t)_members.size() - 1)]; } rpc_address next(rpc_address current) const; rpc_address leader() const { alr_t l(_lock); - return _leader_index >= 0 ? _members[_leader_index] : rpc_address::sInvalid; + return _leader_index >= 0 ? _members[_leader_index] : rpc_address::s_invalid_address; } void leader_forward(); rpc_address possible_leader(); @@ -156,7 +156,7 @@ inline rpc_address rpc_group_address::possible_leader() { alr_t l(_lock); if (_members.empty()) - return rpc_address::sInvalid; + return rpc_address::s_invalid_address; if (_leader_index == -1) _leader_index = dsn_random32(0, (uint32_t)_members.size() - 1); return _members[_leader_index]; @@ -192,7 +192,7 @@ inline rpc_address rpc_group_address::next(rpc_address current) const { alr_t l(_lock); if (_members.empty()) - return rpc_address::sInvalid; + return rpc_address::s_invalid_address; if (current.is_invalid()) return _members[dsn_random32(0, (uint32_t)_members.size() - 1)]; else { diff --git a/include/dsn/tool-api/rpc_address.h b/include/dsn/tool-api/rpc_address.h index 07836d3130..953b8d16c8 100644 --- a/include/dsn/tool-api/rpc_address.h +++ b/include/dsn/tool-api/rpc_address.h @@ -23,17 +23,6 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. */ - -/* - * Description: - * define address helper routines - * - * Revision history: - * July, 2015, @imzhenyu (Zhenyu Guo), first version - * Aug., 2015, @imzhenyu (Zhenyu Guo), add group and uri address support - * xxxx-xx-xx, author, fix bug about xxx - */ - #pragma once #include @@ -63,12 +52,13 @@ class rpc_uri_address; class rpc_address { public: - static const rpc_address sInvalid; + static const rpc_address s_invalid_address; static uint32_t ipv4_from_host(const char *hostname); static uint32_t ipv4_from_network_interface(const char *network_interface); + ~rpc_address(); - rpc_address() = default; + constexpr rpc_address() = default; rpc_address(const rpc_address &another); @@ -85,9 +75,11 @@ class rpc_address rpc_address(const char *host, uint16_t port) { assign_ipv4(host, port); } + rpc_address clone() const; + void assign_ipv4(uint32_t ip, uint16_t port) { - clear(); + set_invalid(); _addr.v4.type = HOST_TYPE_IPV4; _addr.v4.ip = ip; _addr.v4.port = port; @@ -95,7 +87,7 @@ class rpc_address void assign_ipv4(const char *host, uint16_t port) { - clear(); + set_invalid(); _addr.v4.type = HOST_TYPE_IPV4; _addr.v4.ip = rpc_address::ipv4_from_host(host); _addr.v4.port = port; @@ -103,22 +95,23 @@ class rpc_address void assign_ipv4_local_address(const char *network_interface, uint16_t port) { - clear(); + set_invalid(); _addr.v4.type = HOST_TYPE_IPV4; _addr.v4.ip = rpc_address::ipv4_from_network_interface(network_interface); _addr.v4.port = port; } void assign_uri(const char *host_uri); + void assign_group(const char *name); - rpc_address clone() const; const char *to_string() const; std::string to_std_string() const { return std::string(to_string()); } + bool from_string_ipv4(const char *s) { - clear(); + set_invalid(); std::string str = std::string(s); auto pos = str.find_last_of(':'); if (pos == std::string::npos) @@ -140,16 +133,25 @@ class rpc_address uint64_t &value() { return _addr.value; } dsn_host_type_t type() const { return (dsn_host_type_t)_addr.v4.type; } + uint32_t ip() const { return (uint32_t)_addr.v4.ip; } + uint16_t port() const { return (uint16_t)_addr.v4.port; } + void set_port(uint16_t port) { _addr.v4.port = port; } + rpc_group_address *group_address() const { return (rpc_group_address *)(uintptr_t)_addr.group.group; } + rpc_uri_address *uri_address() const { return (rpc_uri_address *)(uintptr_t)_addr.uri.uri; } + bool is_invalid() const { return _addr.v4.type == HOST_TYPE_INVALID; } - void set_invalid() { clear(); } + + // before you assign new value, must call set_invalid() to release original value + // and you MUST ensure that _addr is INITIALIZED before you call this function + void set_invalid(); bool operator==(::dsn::rpc_address r) const { @@ -192,11 +194,6 @@ class rpc_address uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const; #endif -private: - // before you assign new value, must call clear() to release original value - // and you MUST ensure that _addr is INITIALIZED before you call this function - void clear(); - private: union { diff --git a/src/core/core/address.cpp b/src/core/core/address.cpp index 0625858dee..bc56582e03 100644 --- a/src/core/core/address.cpp +++ b/src/core/core/address.cpp @@ -66,7 +66,8 @@ #include namespace dsn { -const rpc_address rpc_address::sInvalid; + +const rpc_address rpc_address::s_invalid_address; #ifdef _WIN32 static void net_init() @@ -155,7 +156,7 @@ DSN_API uint32_t rpc_address::ipv4_from_network_interface(const char *network_in return ret; } -rpc_address::~rpc_address() { clear(); } +rpc_address::~rpc_address() { set_invalid(); } rpc_address::rpc_address(const rpc_address &another) { *this = another; } @@ -165,7 +166,7 @@ rpc_address &rpc_address::operator=(const rpc_address &another) // avoid memory leak return *this; } - clear(); + set_invalid(); _addr = another._addr; switch (another.type()) { case HOST_TYPE_GROUP: @@ -182,7 +183,7 @@ rpc_address &rpc_address::operator=(const rpc_address &another) void rpc_address::assign_uri(const char *host_uri) { - clear(); + set_invalid(); _addr.uri.type = HOST_TYPE_URI; dsn::rpc_uri_address *addr = new dsn::rpc_uri_address(host_uri); // take the lifetime of rpc_uri_address, release_ref when change value or call deconstruct @@ -192,7 +193,7 @@ void rpc_address::assign_uri(const char *host_uri) void rpc_address::assign_group(const char *name) { - clear(); + set_invalid(); _addr.group.type = HOST_TYPE_GROUP; dsn::rpc_group_address *addr = new dsn::rpc_group_address(name); // take the lifetime of rpc_uri_address, release_ref when change value or call deconstruct @@ -217,13 +218,13 @@ rpc_address rpc_address::clone() const new_address._addr.group.group = (uint64_t)addr; new_address._addr.group.type = HOST_TYPE_GROUP; } else { - new_address.clear(); + new_address.set_invalid(); } return new_address; } -void rpc_address::clear() +void rpc_address::set_invalid() { switch (type()) { case HOST_TYPE_GROUP: From ba089ace59a7fbd09ad588abdd1b42aeaab81cdc Mon Sep 17 00:00:00 2001 From: WeijieSun Date: Tue, 10 Apr 2018 14:11:48 +0800 Subject: [PATCH 3/7] rpc_address: remove clone, rename address.cpp --- include/dsn/tool-api/rpc_address.h | 2 -- .../core/{address.cpp => rpc_address.cpp} | 23 ------------------- 2 files changed, 25 deletions(-) rename src/core/core/{address.cpp => rpc_address.cpp} (90%) diff --git a/include/dsn/tool-api/rpc_address.h b/include/dsn/tool-api/rpc_address.h index 953b8d16c8..f01785950c 100644 --- a/include/dsn/tool-api/rpc_address.h +++ b/include/dsn/tool-api/rpc_address.h @@ -75,8 +75,6 @@ class rpc_address rpc_address(const char *host, uint16_t port) { assign_ipv4(host, port); } - rpc_address clone() const; - void assign_ipv4(uint32_t ip, uint16_t port) { set_invalid(); diff --git a/src/core/core/address.cpp b/src/core/core/rpc_address.cpp similarity index 90% rename from src/core/core/address.cpp rename to src/core/core/rpc_address.cpp index bc56582e03..e4efb8eb21 100644 --- a/src/core/core/address.cpp +++ b/src/core/core/rpc_address.cpp @@ -201,29 +201,6 @@ void rpc_address::assign_group(const char *name) _addr.group.group = (uint64_t)addr; } -rpc_address rpc_address::clone() const -{ - rpc_address new_address; - - if (type() == HOST_TYPE_IPV4) { - new_address._addr = _addr; - } else if (type() == HOST_TYPE_URI) { - dsn::rpc_uri_address *addr = new dsn::rpc_uri_address(*uri_address()); - addr->add_ref(); - new_address._addr.uri.uri = (uint64_t)addr; - new_address._addr.uri.type = HOST_TYPE_URI; - } else if (type() == HOST_TYPE_GROUP) { - dsn::rpc_group_address *addr = new dsn::rpc_group_address(*group_address()); - addr->add_ref(); - new_address._addr.group.group = (uint64_t)addr; - new_address._addr.group.type = HOST_TYPE_GROUP; - } else { - new_address.set_invalid(); - } - - return new_address; -} - void rpc_address::set_invalid() { switch (type()) { From dd7d4e9bf3cd05fdb14445564f7c40e5f42d247f Mon Sep 17 00:00:00 2001 From: WeijieSun Date: Tue, 10 Apr 2018 15:02:11 +0800 Subject: [PATCH 4/7] rpc_address: remove useless WIN32 macros. rpc_message: fix comments --- src/core/core/rpc_address.cpp | 57 +---------------------------------- src/core/core/rpc_message.cpp | 15 ++++++--- src/core/tests/address.cpp | 2 -- 3 files changed, 11 insertions(+), 63 deletions(-) diff --git a/src/core/core/rpc_address.cpp b/src/core/core/rpc_address.cpp index e4efb8eb21..72863eab4a 100644 --- a/src/core/core/rpc_address.cpp +++ b/src/core/core/rpc_address.cpp @@ -24,37 +24,12 @@ * THE SOFTWARE. */ -/* - * Description: - * What is this file about? - * - * Revision history: - * xxxx-xx-xx, author, first version - * xxxx-xx-xx, author, fix bug about xxx - */ - -#ifdef _WIN32 - -#define _WINSOCK_DEPRECATED_NO_WARNINGS 1 - -#include -#include -#include -#pragma comment(lib, "ws2_32.lib") - -#else #include #include #include #include #include -#if defined(__FreeBSD__) -#include -#endif - -#endif - #include #include @@ -69,21 +44,6 @@ namespace dsn { const rpc_address rpc_address::s_invalid_address; -#ifdef _WIN32 -static void net_init() -{ - static std::once_flag flag; - static bool flag_inited = false; - if (!flag_inited) { - std::call_once(flag, [&]() { - WSADATA wsaData; - WSAStartup(MAKEWORD(2, 2), &wsaData); - flag_inited = true; - }); - } -} -#endif - /*static*/ uint32_t rpc_address::ipv4_from_host(const char *name) { @@ -93,13 +53,7 @@ uint32_t rpc_address::ipv4_from_host(const char *name) addr.sin_family = AF_INET; if ((addr.sin_addr.s_addr = inet_addr(name)) == (unsigned int)(-1)) { hostent *hp = ::gethostbyname(name); - int err = -#ifdef _WIN32 - (int)::WSAGetLastError() -#else - h_errno -#endif - ; + int err = h_errno; if (hp == nullptr) { derror("gethostbyname failed, name = %s, err = %d.", name, err); @@ -222,23 +176,14 @@ const char *rpc_address::to_string() const char *p = bf.next(); auto sz = bf.get_chunk_size(); struct in_addr net_addr; -#ifdef _WIN32 - char *ip_str; -#else int ip_len; -#endif switch (_addr.v4.type) { case HOST_TYPE_IPV4: net_addr.s_addr = htonl(ip()); -#ifdef _WIN32 - ip_str = inet_ntoa(net_addr); - snprintf_p(p, sz, "%s:%hu", ip_str, (uint16_t)addr.u.v4.port); -#else inet_ntop(AF_INET, &net_addr, p, sz); ip_len = strlen(p); snprintf_p(p + ip_len, sz - ip_len, ":%hu", port()); -#endif break; case HOST_TYPE_URI: p = (char *)uri_address()->uri(); diff --git a/src/core/core/rpc_message.cpp b/src/core/core/rpc_message.cpp index 8ca71e32ce..35c8a52bfb 100644 --- a/src/core/core/rpc_message.cpp +++ b/src/core/core/rpc_message.cpp @@ -214,11 +214,16 @@ message_ex::message_ex() message_ex::~message_ex() { - // when recv a request, message_header is hidden ahead of buffer(see@create_receive_message - // function), if you call copy_and_prepare_send() function, then a new request will be create, - // but new request share the same header with the old_request, so if old_request release header, - // then new request's header is invalid, so we don't release_buffer_header(), and there will not - // lead any problem, see@ Attention of message_header + // when receiving a request, the lifetime of message_header object is managed by + // vector "buffers" (see@create_receive_message function). + // if you call copy_and_prepare_send() function, then a new request will be created, + // but new request shares the same header with the old_request. + // so if old_request releases header, + // then new request's header will be invalid. + // + // so we don't release_buffer_header(). + // however, this won't lead to any memory leak problems as long as the header is a POD type + // see@ Attention of message_header // release_buffer_header(); if (!_is_read) { diff --git a/src/core/tests/address.cpp b/src/core/tests/address.cpp index 8c444cdfe4..75f0ed0002 100644 --- a/src/core/tests/address.cpp +++ b/src/core/tests/address.cpp @@ -78,11 +78,9 @@ TEST(core, rpc_address_ipv4_from_host) TEST(core, rpc_address_ipv4_from_network_interface) { -#ifndef _WIN32 ASSERT_EQ(host_ipv4(127, 0, 0, 1), rpc_address::ipv4_from_network_interface("lo")); ASSERT_EQ(host_ipv4(0, 0, 0, 0), rpc_address::ipv4_from_network_interface("not_exist_interface")); -#endif } TEST(core, rpc_address_to_string) From 4b535d3a3670bb7d3992a5dc91895edd5b49a3fe Mon Sep 17 00:00:00 2001 From: WeijieSun Date: Tue, 10 Apr 2018 15:07:21 +0800 Subject: [PATCH 5/7] rpc_address: fix comments --- src/core/core/rpc_address.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/core/core/rpc_address.cpp b/src/core/core/rpc_address.cpp index 72863eab4a..ac3abfaf2d 100644 --- a/src/core/core/rpc_address.cpp +++ b/src/core/core/rpc_address.cpp @@ -67,10 +67,10 @@ uint32_t rpc_address::ipv4_from_host(const char *name) return (uint32_t)ntohl(addr.sin_addr.s_addr); } -/*static*/ // if network_interface is "", then return the first // site-local ipv4 address: 10.*.*.*, 172.16.*.*, 192.168.*.* -DSN_API uint32_t rpc_address::ipv4_from_network_interface(const char *network_interface) +/*static*/ +uint32_t rpc_address::ipv4_from_network_interface(const char *network_interface) { uint32_t ret = 0; @@ -140,7 +140,7 @@ void rpc_address::assign_uri(const char *host_uri) set_invalid(); _addr.uri.type = HOST_TYPE_URI; dsn::rpc_uri_address *addr = new dsn::rpc_uri_address(host_uri); - // take the lifetime of rpc_uri_address, release_ref when change value or call deconstruct + // take the lifetime of rpc_uri_address, release_ref when change value or call destructor addr->add_ref(); _addr.uri.uri = (uint64_t)addr; } @@ -150,7 +150,7 @@ void rpc_address::assign_group(const char *name) set_invalid(); _addr.group.type = HOST_TYPE_GROUP; dsn::rpc_group_address *addr = new dsn::rpc_group_address(name); - // take the lifetime of rpc_uri_address, release_ref when change value or call deconstruct + // take the lifetime of rpc_uri_address, release_ref when change value or call destructor addr->add_ref(); _addr.group.group = (uint64_t)addr; } From 16c0d16784b31e02185825f0916675b785def7f0 Mon Sep 17 00:00:00 2001 From: Weijie Sun Date: Tue, 10 Apr 2018 21:46:38 +0800 Subject: [PATCH 6/7] rpc_message: fix comments about message header --- src/core/core/rpc_message.cpp | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/core/core/rpc_message.cpp b/src/core/core/rpc_message.cpp index 35c8a52bfb..10a1c95fca 100644 --- a/src/core/core/rpc_message.cpp +++ b/src/core/core/rpc_message.cpp @@ -221,8 +221,9 @@ message_ex::~message_ex() // so if old_request releases header, // then new request's header will be invalid. // - // so we don't release_buffer_header(). - // however, this won't lead to any memory leak problems as long as the header is a POD type + // so we don't call release_buffer_header(). + // however, this won't lead to any memory leak problems coz we can TREAT message_header + // as a POD type (the constructors of gpid and rpc_address are trival) // see@ Attention of message_header // release_buffer_header(); @@ -478,15 +479,17 @@ void message_ex::prepare_buffer_header() this->_rw_offset = (int)sizeof(message_header); this->buffers.push_back(buffer); - // here, we should call placement new, because message_header contain variable that is not - // POD-type, such as rpc_address, so we should call these variable's construct + // here we should call placement new, + // so the gpid & rpc_address can be initialized new (ptr)(message_header); + header = (message_header *)ptr; } void message_ex::release_buffer_header() { - // message_header contain variable that is not POD-type, so we call these variable's destructor + // we should call destructor explicitly + // as the header is constructed with placement new, see@prepare_buffer_header header->~message_header(); } From 46a98c222071c527fa41878caa56bde9361a6912 Mon Sep 17 00:00:00 2001 From: Weijie Sun Date: Wed, 11 Apr 2018 11:05:59 +0800 Subject: [PATCH 7/7] rpc_message: fix comments --- include/dsn/tool-api/rpc_message.h | 15 ++++++++++----- src/core/core/rpc_message.cpp | 28 +++++++++++++++++----------- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/include/dsn/tool-api/rpc_message.h b/include/dsn/tool-api/rpc_message.h index f1ce3959fd..7996339c5a 100644 --- a/include/dsn/tool-api/rpc_message.h +++ b/include/dsn/tool-api/rpc_message.h @@ -80,9 +80,9 @@ typedef struct message_header dsn_msg_context_t context; // Attention: - // here, from_address must be IPv4 address, namely we can regard from_address as a - // POD-type structure, so no memory-leak will occur even if we don't call it's - // destructor. + // here, from_address must be IPv4 address, namely we can regard from_address as a + // POD-type structure, so no memory-leak will occur even if we don't call it's + // destructor. // // generally, it is the from_node's primary address, except the // case described in message_ex::create_response()'s ATTENTION comment. @@ -113,8 +113,13 @@ class message_ex : public ref_counter, { public: message_header *header; - std::vector buffers; // header included for *send* message, - // header not included for *recieved* + // "buffers" are used to manage memory allocated for this message. + // the memory used by "header" is also mamanged in "buffers". + // + // please see "create_request", "create_recieve_message", + // "create_receive_message_with_standalone_header" for the details on + // how the headers managed by buffer + std::vector buffers; // by rpc and network rpc_session_ptr io_session; // send/recv session diff --git a/src/core/core/rpc_message.cpp b/src/core/core/rpc_message.cpp index 10a1c95fca..565c827d07 100644 --- a/src/core/core/rpc_message.cpp +++ b/src/core/core/rpc_message.cpp @@ -214,19 +214,25 @@ message_ex::message_ex() message_ex::~message_ex() { - // when receiving a request, the lifetime of message_header object is managed by - // vector "buffers" (see@create_receive_message function). - // if you call copy_and_prepare_send() function, then a new request will be created, - // but new request shares the same header with the old_request. - // so if old_request releases header, - // then new request's header will be invalid. + // coz message_header's memory is managed by vector "buffers", so its memory will be released + // after blobs in "buffers" are free. // - // so we don't call release_buffer_header(). - // however, this won't lead to any memory leak problems coz we can TREAT message_header - // as a POD type (the constructors of gpid and rpc_address are trival) - // see@ Attention of message_header + // however, the message_header's object is constructed with placement new + // in prepare_buffer_header, so the destructor won't be called automatically with the + // "free of blobs in buffers". + // + // strictly speaking, we should call release_header_buffer to trigger message_header's + // destructor, but we can't do this as the message_header may be shared with other + // rpc_message objects if you call "copy_and_prepare_send". + // + // so here we simply skip the release_header_buffer. Notice this won't lead to any + // memory leak problem as the header's destructor is trival: + // gpid -> we can treat it as POD type + // rpc_address -> only ipv4, we can treat it as POD type + // + // Please refer to comments on message_header's definition for details - // release_buffer_header(); + // release_header_buffer(); if (!_is_read) { dassert(_rw_committed, "message write is not committed"); }