Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: java client support FQDN #1649

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions idl/bulk_load.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ struct bulk_load_request
7:bulk_load_status meta_bulk_load_status;
8:bool query_bulk_load_metadata;
9:string remote_root_path;
10:optional dsn.host_port hp_primary_addr;
}

struct bulk_load_response
Expand All @@ -119,6 +120,7 @@ struct bulk_load_response
8:optional bool is_group_ingestion_finished;
9:optional bool is_group_bulk_load_context_cleaned_up;
10:optional bool is_group_bulk_load_paused;
11:optional map<dsn.host_port, partition_bulk_load_state> hp_group_bulk_load_state;
}

// primary -> secondary
Expand All @@ -131,6 +133,7 @@ struct group_bulk_load_request
5:string cluster_name;
6:bulk_load_status meta_bulk_load_status;
7:string remote_root_path;
8:optional dsn.host_port hp_target_address;
}

struct group_bulk_load_response
Expand Down Expand Up @@ -218,6 +221,7 @@ struct query_bulk_load_response
6:list<map<dsn.rpc_address, partition_bulk_load_state>> bulk_load_states;
7:optional string hint_msg;
8:optional bool is_bulk_loading;
9:optional list<map<dsn.host_port, partition_bulk_load_state>> hp_bulk_load_states;
}

struct clear_bulk_load_state_request
Expand Down
3 changes: 3 additions & 0 deletions idl/dsn.layer2.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ struct partition_configuration
6:list<dsn.rpc_address> last_drops;
7:i64 last_committed_decree;
8:i32 partition_flags;
9:optional dsn.host_port hp_primary;
10:optional list<dsn.host_port> hp_secondaries;
11:optional list<dsn.host_port> hp_last_drops;
}

struct query_cfg_request
Expand Down
1 change: 1 addition & 0 deletions idl/duplication.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ struct duplication_sync_request
1:dsn.rpc_address node;

2:map<dsn.gpid, list<duplication_confirm_entry>> confirm_list;
3:dsn.host_port hp_node;
}

struct duplication_sync_response
Expand Down
8 changes: 8 additions & 0 deletions idl/meta_admin.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ struct configuration_update_request
// the `meta_split_status` will be set
// only used when on_config_sync
6:optional metadata.split_status meta_split_status;
7:optional dsn.host_port hp_node;
}

// meta server (config mgr) => primary | secondary (downgrade) (w/ new config)
Expand Down Expand Up @@ -103,6 +104,7 @@ struct configuration_query_by_node_request
1:dsn.rpc_address node;
2:optional list<metadata.replica_info> stored_replicas;
3:optional replica_server_info info;
4:optional dsn.host_port hp_node;
}

struct configuration_query_by_node_response
Expand All @@ -117,6 +119,7 @@ struct configuration_recovery_request
1:list<dsn.rpc_address> recovery_set;
2:bool skip_bad_nodes;
3:bool skip_lost_partitions;
4:optional list<dsn.host_port> hp_recovery_set;
}

struct configuration_recovery_response
Expand Down Expand Up @@ -205,6 +208,7 @@ struct configuration_list_apps_response
struct query_app_info_request
{
1:dsn.rpc_address meta_server;
2:optional dsn.host_port hp_meta_server;
}

struct query_app_info_response
Expand Down Expand Up @@ -280,6 +284,7 @@ struct node_info
{
1:node_status status = node_status.NS_INVALID;
2:dsn.rpc_address address;
3:optional dsn.host_port hp_address;
}

struct configuration_list_nodes_request
Expand Down Expand Up @@ -349,6 +354,8 @@ struct configuration_proposal_action
// depricated now
// new fields of this struct should start with 5
// 4:i64 period_ts;
5:optional dsn.host_port hp_target;
6:optional dsn.host_port hp_node;
}

struct configuration_balancer_request
Expand Down Expand Up @@ -381,6 +388,7 @@ struct ddd_node_info
5:i64 ballot; // collected && ballot == -1 means replica not exist on this node
6:i64 last_committed_decree;
7:i64 last_prepared_decree;
8:optional dsn.host_port hp_node;
}

struct ddd_partition_info
Expand Down
1 change: 1 addition & 0 deletions idl/metadata.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ struct replica_configuration
// 2. false - secondary copy mutation in this prepare message asynchronously
// NOTICE: it should always be false when update_local_configuration
7:optional bool split_sync_to_child = false;
8:optional dsn.host_port hp_primary;
}

struct replica_info
Expand Down
3 changes: 3 additions & 0 deletions idl/partition_split.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ struct notify_catch_up_request
2:dsn.gpid child_gpid;
3:i64 child_ballot;
4:dsn.rpc_address child_address;
5:optional dsn.host_port hp_child_address;
}

struct notify_cacth_up_response
Expand All @@ -116,6 +117,7 @@ struct update_child_group_partition_count_request
2:i32 new_partition_count;
3:dsn.gpid child_pid;
4:i64 ballot;
5:optional dsn.host_port hp_target_address;
}

struct update_child_group_partition_count_response
Expand All @@ -133,6 +135,7 @@ struct register_child_request
2:dsn.layer2.partition_configuration parent_config;
3:dsn.layer2.partition_configuration child_config;
4:dsn.rpc_address primary_address;
5:optional dsn.host_port hp_primary_address;
}

struct register_child_response
Expand Down
1 change: 1 addition & 0 deletions idl/replica_admin.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ struct query_disk_info_request
{
1:dsn.rpc_address node;
2:string app_name;
3:optional dsn.host_port hp_node;
}

// This response is from replica_server to client.
Expand Down
2 changes: 1 addition & 1 deletion scripts/recompile_thrift.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ rm -rf $TMP_DIR
mkdir -p $TMP_DIR
$THIRDPARTY_ROOT/output/bin/thrift --gen cpp:moveable_types -out $TMP_DIR ../idl/rrdb.thrift

sed 's/#include "dsn_types.h"/#include "runtime\/rpc\/rpc_address.h"\n#include "runtime\/task\/task_code.h"\n#include "utils\/blob.h"/' $TMP_DIR/rrdb_types.h > ../src/include/rrdb/rrdb_types.h
sed 's/#include "dsn_types.h"/#include "runtime\/rpc\/rpc_address.h"\n#include "runtime\/rpc\/rpc_host_port.h"\n#include "runtime\/task\/task_code.h"\n#include "utils\/blob.h"/' $TMP_DIR/rrdb_types.h > ../src/include/rrdb/rrdb_types.h
sed 's/#include "rrdb_types.h"/#include <rrdb\/rrdb_types.h>/' $TMP_DIR/rrdb_types.cpp > ../src/base/rrdb_types.cpp

rm -rf $TMP_DIR
Expand Down
4 changes: 4 additions & 0 deletions src/common/consensus.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ struct learn_request
// be duplicated (ie. max_gced_decree < confirmed_decree), if not,
// learnee will copy the missing logs.
7:optional i64 max_gced_decree;
8:optional dsn.host_port hp_learner;
}

struct learn_response
Expand All @@ -156,6 +157,7 @@ struct learn_response
7:dsn.rpc_address address; // learnee's address
8:string base_local_dir; // base dir of files on learnee
9:optional string replica_disk_tag; // the disk tag of learnee located
10:optional dsn.host_port hp_address; // learnee's address
}

struct learn_notify_response
Expand All @@ -180,6 +182,7 @@ struct group_check_request
// Used to deliver child gpid and meta_split_status during partition split
6:optional dsn.gpid child_gpid;
7:optional metadata.split_status meta_split_status;
8:optional dsn.host_port hp_node;
}

struct group_check_response
Expand All @@ -195,5 +198,6 @@ struct group_check_response
// if secondary pause or cancel split succeed, is_split_stopped = true
8:optional bool is_split_stopped;
9:optional metadata.disk_status disk_status = metadata.disk_status.NORMAL;
10:optional dsn.host_port hp_node;
}

27 changes: 16 additions & 11 deletions src/failure_detector/fd.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,28 @@ namespace cpp dsn.fd

struct beacon_msg
{
1: i64 time;
2: dsn.rpc_address from_addr;
3: dsn.rpc_address to_addr;
4: optional i64 start_time;
1: i64 time;
2: dsn.rpc_address from_addr;
3: dsn.rpc_address to_addr;
4: optional i64 start_time;
5: optional dsn.host_port hp_from_addr;
6: optional dsn.host_port hp_to_addr;
}

struct beacon_ack
{
1: i64 time;
2: dsn.rpc_address this_node;
3: dsn.rpc_address primary_node;
4: bool is_master;
5: bool allowed;
1: i64 time;
2: dsn.rpc_address this_node;
3: dsn.rpc_address primary_node;
4: bool is_master;
5: bool allowed;
6: optional dsn.host_port hp_this_node;
7: optional dsn.host_port hp_primary_node;
}

struct config_master_message
{
1: dsn.rpc_address master;
2: bool is_register;
1: dsn.rpc_address master;
2: bool is_register;
3: optional dsn.host_port hp_master;
}
2 changes: 2 additions & 0 deletions src/nfs/nfs.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ struct copy_request
8: bool overwrite;
9: optional string source_disk_tag;
10: optional dsn.gpid pid;
11: optional dsn.host_port hp_source;
}

struct copy_response
Expand All @@ -60,6 +61,7 @@ struct get_file_size_request
6: optional string source_disk_tag;
7: optional string dest_disk_tag;
8: optional dsn.gpid pid;
9: optional dsn.host_port hp_source;
}

struct get_file_size_response
Expand Down
6 changes: 4 additions & 2 deletions src/runtime/rpc/group_host_port.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <vector>

#include "runtime/rpc/group_address.h"
#include "runtime/rpc/group_host_port.h"
#include "runtime/rpc/rpc_host_port.h"
#include "utils/autoref_ptr.h"
#include "utils/fmt_logging.h"
Expand Down Expand Up @@ -131,7 +130,10 @@ inline rpc_group_host_port::rpc_group_host_port(const rpc_group_address *g_addr)
CHECK_TRUE(add(host_port(addr)));
}
_update_leader_automatically = g_addr->is_update_leader_automatically();
set_leader(host_port(g_addr->leader()));
auto leader_addr = g_addr->leader();
if (rpc_address::s_invalid_address != leader_addr) {
set_leader(host_port(leader_addr));
}
}

inline rpc_group_host_port &rpc_group_host_port::operator=(const rpc_group_host_port &other)
Expand Down
40 changes: 39 additions & 1 deletion src/runtime/rpc/rpc_host_port.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "runtime/rpc/rpc_host_port.h"
#include "utils/error_code.h"
#include "utils/safe_strerror_posix.h"
#include "utils/string_conv.h"
#include "utils/utils.h"

namespace dsn {
Expand Down Expand Up @@ -68,8 +69,35 @@ host_port::host_port(std::string host, uint16_t port)
CHECK_NE_MSG(rpc_address::ipv4_from_host(_host.c_str()), 0, "invalid hostname: {}", _host);
}

bool host_port::from_string(const std::string s)
{
std::string ip_port = s;
auto pos = ip_port.find_last_of(':');
if (pos == std::string::npos) {
return false;
}
std::string host = ip_port.substr(0, pos);
std::string port = ip_port.substr(pos + 1);

// check port
unsigned int port_num;
if (!internal::buf2unsigned(port, port_num) || port_num > UINT16_MAX) {
return false;
}

if (rpc_address::ipv4_from_host(host.c_str()) == 0) {
return false;
}

_type = HOST_TYPE_IPV4;
_host = host;
_port = port_num;
return true;
}

host_port::host_port(rpc_address addr)
{
reset();
switch (addr.type()) {
case HOST_TYPE_IPV4: {
CHECK(utils::hostname_from_ip(htonl(addr.ip()), &_host),
Expand All @@ -79,6 +107,7 @@ host_port::host_port(rpc_address addr)
} break;
case HOST_TYPE_GROUP: {
_group_host_port = new rpc_group_host_port(addr.group_address());
_group_host_port->add_ref();
} break;
default:
break;
Expand Down Expand Up @@ -133,7 +162,7 @@ std::string host_port::to_string() const
case HOST_TYPE_GROUP:
return fmt::format("address group {}", group_host_port()->name());
default:
return "invalid address";
return "invalid host_port";
}
}

Expand Down Expand Up @@ -198,4 +227,13 @@ error_s host_port::resolve_addresses(std::vector<rpc_address> &addresses) const
return error_s::ok();
}

void host_port::fill_host_ports_from_addresses(const std::vector<rpc_address> &addr_v,
std::vector<host_port> &hp_v)
{
CHECK(hp_v.empty(), "optional host_port should be empty!");
for (const auto &addr : addr_v) {
hp_v.emplace_back(host_port(addr));
}
}

} // namespace dsn
Loading
Loading