Skip to content

Commit

Permalink
lyc 1
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 authored and GehaFearless committed Jan 25, 2024
1 parent 0c302c5 commit 62ef062
Show file tree
Hide file tree
Showing 47 changed files with 411 additions and 425 deletions.
18 changes: 9 additions & 9 deletions idl/bulk_load.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ struct bulk_load_request
7:bulk_load_status meta_bulk_load_status;
8:bool query_bulk_load_metadata;
9:string remote_root_path;
10:optional dsn.host_port hp_primary_addr;
10:optional dsn.host_port hp_primary;
}

struct bulk_load_response
Expand Down Expand Up @@ -126,14 +126,14 @@ struct bulk_load_response
// primary -> secondary
struct group_bulk_load_request
{
1:string app_name;
2:dsn.rpc_address target_address;
3:metadata.replica_configuration config;
4:string provider_name;
5:string cluster_name;
6:bulk_load_status meta_bulk_load_status;
7:string remote_root_path;
8:optional dsn.host_port hp_target_address;
1:string app_name;
2:dsn.rpc_address target;
3:metadata.replica_configuration config;
4:string provider_name;
5:string cluster_name;
6:bulk_load_status meta_bulk_load_status;
7:string remote_root_path;
8:optional dsn.host_port hp_target;
}

struct group_bulk_load_response
Expand Down
14 changes: 7 additions & 7 deletions idl/partition_split.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,11 @@ struct notify_cacth_up_response
// primary parent -> child replicas to update partition count
struct update_child_group_partition_count_request
{
1:dsn.rpc_address target_address;
1:dsn.rpc_address target;
2:i32 new_partition_count;
3:dsn.gpid child_pid;
4:i64 ballot;
5:optional dsn.host_port hp_target_address;
5:optional dsn.host_port hp_target;
}

struct update_child_group_partition_count_response
Expand All @@ -131,11 +131,11 @@ struct update_child_group_partition_count_response
// primary parent -> meta server, register child on meta_server
struct register_child_request
{
1:dsn.layer2.app_info app;
2:dsn.layer2.partition_configuration parent_config;
3:dsn.layer2.partition_configuration child_config;
4:dsn.rpc_address primary_address;
5:optional dsn.host_port hp_primary_address;
1:dsn.layer2.app_info app;
2:dsn.layer2.partition_configuration parent_config;
3:dsn.layer2.partition_configuration child_config;
4:dsn.rpc_address primary_address;
5:optional dsn.host_port hp_primary;
}

struct register_child_response
Expand Down
10 changes: 5 additions & 5 deletions src/common/fs_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ void fs_manager::add_replica(const gpid &pid, const std::string &pid_dir)
const auto &dn = get_dir_node(pid_dir);
if (dsn_unlikely(nullptr == dn)) {
LOG_ERROR(
"{}: dir({}) of gpid({}) haven't registered", dsn_primary_address(), pid_dir, pid);
"{}: dir({}) of gpid({}) haven't registered", dsn_primary_host_port(), pid_dir, pid);
return;
}

Expand All @@ -281,11 +281,11 @@ void fs_manager::add_replica(const gpid &pid, const std::string &pid_dir)
}
if (!emplace_success) {
LOG_WARNING(
"{}: gpid({}) already in the dir_node({})", dsn_primary_address(), pid, dn->tag);
"{}: gpid({}) already in the dir_node({})", dsn_primary_host_port(), pid, dn->tag);
return;
}

LOG_INFO("{}: add gpid({}) to dir_node({})", dsn_primary_address(), pid, dn->tag);
LOG_INFO("{}: add gpid({}) to dir_node({})", dsn_primary_host_port(), pid, dn->tag);
}

dir_node *fs_manager::find_best_dir_for_new_replica(const gpid &pid) const
Expand Down Expand Up @@ -319,7 +319,7 @@ dir_node *fs_manager::find_best_dir_for_new_replica(const gpid &pid) const
if (selected != nullptr) {
LOG_INFO(
"{}: put pid({}) to dir({}), which has {} replicas of current app, {} replicas totally",
dsn_primary_address(),
dsn_primary_host_port(),
pid,
selected->tag,
least_app_replicas_count,
Expand Down Expand Up @@ -359,7 +359,7 @@ void fs_manager::remove_replica(const gpid &pid)
pid,
dn->tag);
if (r != 0) {
LOG_INFO("{}: remove gpid({}) from dir({})", dsn_primary_address(), pid, dn->tag);
LOG_INFO("{}: remove gpid({}) from dir({})", dsn_primary_host_port(), pid, dn->tag);
}
remove_count += r;
}
Expand Down
4 changes: 2 additions & 2 deletions src/failure_detector/failure_detector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,8 @@ void failure_detector::on_ping_internal(const beacon_msg &beacon, /*out*/ beacon

ack.time = beacon.time;
ack.this_node = beacon.to_addr;
ack.primary_node = dsn_primary_address();
ack.__set_hp_this_node(hp_to_addr);
ack.primary_node = dsn_primary_address();
ack.__set_hp_primary_node(dsn_primary_host_port());
ack.is_master = true;
ack.allowed = true;
Expand Down Expand Up @@ -589,8 +589,8 @@ void failure_detector::send_beacon(::dsn::host_port target, uint64_t time)
beacon_msg beacon;
beacon.time = time;
beacon.from_addr = dsn_primary_address();
beacon.to_addr = addr;
beacon.__set_hp_from_addr(dsn_primary_host_port());
beacon.to_addr = addr;
beacon.__set_hp_to_addr(target);
beacon.__set_start_time(static_cast<int64_t>(dsn::utils::process_start_millis()));

Expand Down
8 changes: 4 additions & 4 deletions src/failure_detector/test/failure_detector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,13 @@ class test_worker : public service_app, public serverlet<test_worker>
request.master,
request.is_register ? "reg" : "unreg");

host_port master;
GET_HOST_PORT(request, master, master);
host_port hp_master;
GET_HOST_PORT(request, master, hp_master);

if (request.is_register)
_worker_fd->register_master(master);
_worker_fd->register_master(hp_master);
else
_worker_fd->unregister_master(master);
_worker_fd->unregister_master(hp_master);
response = true;
}

Expand Down
3 changes: 3 additions & 0 deletions src/meta/greedy_load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "meta/meta_data.h"
#include "meta_admin_types.h"
#include "server_load_balancer.h"
#include "utils/fmt_utils.h"

namespace dsn {
class command_deregister;
Expand Down Expand Up @@ -101,3 +102,5 @@ inline configuration_proposal_action new_proposal_action(const rpc_address &targ

} // namespace replication
} // namespace dsn

USER_DEFINED_STRUCTURE_FORMATTER(::dsn::replication::configuration_proposal_action);
17 changes: 8 additions & 9 deletions src/meta/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ void bulk_load_service::partition_bulk_load(const std::string &app_name, const g
req->pid = pid;
req->app_name = app_name;
req->primary_addr = primary_addr;
req->__set_hp_primary_addr(primary_hp);
req->__set_hp_primary(primary_hp);
req->remote_provider_name = ainfo.file_provider_type;
req->cluster_name = ainfo.cluster_name;
req->meta_bulk_load_status = get_partition_bulk_load_status_unlocked(pid);
Expand All @@ -459,8 +459,7 @@ void bulk_load_service::partition_bulk_load(const std::string &app_name, const g
// remote server maybe not supported host_post, just have address
auto &bulk_load_resp = rpc.response();
if (!bulk_load_resp.__isset.hp_group_bulk_load_state) {
bulk_load_resp.__set_hp_group_bulk_load_state(
std::map<host_port, partition_bulk_load_state>());
bulk_load_resp.__set_hp_group_bulk_load_state({});
for (const auto &kv : bulk_load_resp.group_bulk_load_state) {
auto hp = host_port(kv.first);
bulk_load_resp.hp_group_bulk_load_state[hp] = kv.second;
Expand All @@ -479,7 +478,7 @@ void bulk_load_service::on_partition_bulk_load_reply(error_code err,
const std::string &app_name = request.app_name;
const gpid &pid = request.pid;
const auto &primary_addr = request.primary_addr;
const auto &primary_hp = request.hp_primary_addr;
const auto &primary_hp = request.hp_primary;

if (err != ERR_OK) {
LOG_ERROR("app({}), partition({}) failed to receive bulk load response from node({}({})), "
Expand All @@ -488,7 +487,7 @@ void bulk_load_service::on_partition_bulk_load_reply(error_code err,
pid,
primary_hp,
primary_addr,
err.to_string());
err);
try_rollback_to_downloading(app_name, pid);
try_resend_bulk_load_request(app_name, pid);
return;
Expand Down Expand Up @@ -1036,10 +1035,10 @@ void bulk_load_service::update_partition_info_unlock(const gpid &pid,
// no need to update other field of partition_bulk_load_info
return;
}
pinfo.addresses.clear();
pinfo.host_ports.clear();
const auto &state = _partitions_bulk_load_state[pid];
for (const auto &kv : state) {
pinfo.addresses.emplace_back(kv.first);
pinfo.host_ports.emplace_back(kv.first);
}
pinfo.ever_ingest_succeed = true;
}
Expand Down Expand Up @@ -1216,9 +1215,9 @@ bool bulk_load_service::check_ever_ingestion_succeed(const partition_configurati
current_nodes.emplace_back(secondary);
}

std::sort(pinfo.addresses.begin(), pinfo.addresses.end());
std::sort(pinfo.host_ports.begin(), pinfo.host_ports.end());
std::sort(current_nodes.begin(), current_nodes.end());
if (current_nodes == pinfo.addresses) {
if (current_nodes == pinfo.host_ports) {
LOG_INFO("app({}) partition({}) has already executed ingestion succeed", app_name, pid);
update_partition_info_on_remote_storage(app_name, pid, bulk_load_status::BLS_SUCCEED);
return true;
Expand Down
4 changes: 2 additions & 2 deletions src/meta/meta_bulk_load_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ struct partition_bulk_load_info
bulk_load_status::type status;
bulk_load_metadata metadata;
bool ever_ingest_succeed;
std::vector<host_port> addresses;
DEFINE_JSON_SERIALIZATION(status, metadata, ever_ingest_succeed, addresses)
std::vector<host_port> host_ports;
DEFINE_JSON_SERIALIZATION(status, metadata, ever_ingest_succeed, host_ports)
};

// Used for remote file provider
Expand Down
4 changes: 2 additions & 2 deletions src/meta/meta_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,8 @@ app_state::app_state(const app_info &info) : app_info(info), helpers(new app_sta
config.secondaries.clear();

config.__set_hp_primary(host_port());
config.__set_hp_secondaries(std::vector<host_port>());
config.__set_hp_last_drops(std::vector<host_port>());
config.__set_hp_secondaries({});
config.__set_hp_last_drops({});

partitions.assign(app_info::partition_count, config);
for (int i = 0; i != app_info::partition_count; ++i)
Expand Down
3 changes: 1 addition & 2 deletions src/meta/meta_http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
#include "meta_http_service.h"
#include "meta_server_failure_detector.h"
#include "runtime/api_layer1.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_host_port.h"
#include "server_load_balancer.h"
#include "server_state.h"
Expand Down Expand Up @@ -477,7 +476,7 @@ void meta_http_service::get_cluster_info_handler(const http_request &req, http_r
}
}
tp.add_row_name_and_data("meta_servers", meta_servers_str);
tp.add_row_name_and_data("primary_meta_server", dsn_primary_address().to_string());
tp.add_row_name_and_data("primary_meta_server", dsn_primary_host_port().to_string());
tp.add_row_name_and_data("zookeeper_hosts", dsn::dist::FLAGS_hosts_list);
tp.add_row_name_and_data("zookeeper_root", _service->_cluster_root);
tp.add_row_name_and_data(
Expand Down
2 changes: 1 addition & 1 deletion src/meta/meta_split_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ void meta_split_service::on_add_child_on_remote_storage_reply(error_code ec,
update_child_request->info = *app;
update_child_request->type = config_type::CT_REGISTER_CHILD;
update_child_request->node = request.primary_address;
update_child_request->__set_hp_node(request.hp_primary_address);
update_child_request->__set_hp_node(request.hp_primary);

partition_configuration child_config = app->partitions[child_gpid.get_partition_index()];
child_config.secondaries = request.child_config.secondaries;
Expand Down
44 changes: 21 additions & 23 deletions src/meta/partition_guardian.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "meta/partition_guardian.h"

#include <fmt/core.h>
// IWYU pragma: no_include <ext/alloc_traits.h>
#include <inttypes.h>
#include <stdio.h>
Expand All @@ -26,6 +27,7 @@

#include "common/replication_common.h"
#include "common/replication_other_types.h"
#include "meta/greedy_load_balancer.h" // IWYU pragma: keep
#include "meta/meta_data.h"
#include "meta/meta_service.h"
#include "meta/server_load_balancer.h"
Expand Down Expand Up @@ -129,8 +131,8 @@ void partition_guardian::reconfig(meta_view view, const configuration_update_req

CHECK(cc->record_drop_history(request.hp_node),
"node({}({})) has been in the dropped",
request.hp_node.to_string(),
request.node.to_string());
request.hp_node,
request.node);
}
});
}
Expand All @@ -149,25 +151,25 @@ bool partition_guardian::from_proposals(meta_view &view,
return false;
}
action = *(cc.lb_actions.front());
char reason[1024];
std::string reason;
if (action.target.is_invalid() || action.hp_target.is_invalid()) {
sprintf(reason, "action target is invalid");
reason = "action target is invalid";
goto invalid_action;
}
if (action.node.is_invalid() || action.hp_node.is_invalid()) {
sprintf(reason, "action node is invalid");
reason = "action node is invalid";
goto invalid_action;
}
if (!is_node_alive(*(view.nodes), action.hp_target)) {
sprintf(reason, "action target(%s) is not alive", action.hp_target.to_string().c_str());
reason = fmt::format("action target({}) is not alive", action.hp_target);
goto invalid_action;
}
if (!is_node_alive(*(view.nodes), action.hp_node)) {
sprintf(reason, "action node(%s) is not alive", action.hp_node.to_string().c_str());
reason = fmt::format("action node({}) is not alive", action.hp_node);
goto invalid_action;
}
if (cc.lb_actions.is_abnormal_learning_proposal()) {
sprintf(reason, "learning process abnormal");
reason = "learning process abnormal";
goto invalid_action;
}

Expand Down Expand Up @@ -197,16 +199,15 @@ bool partition_guardian::from_proposals(meta_view &view,
break;
}

if (is_action_valid)
if (is_action_valid) {
return true;
else
sprintf(reason, "action is invalid");
} else {
reason = "action is invalid";
}

invalid_action:
std::stringstream ss;
ss << action;
LOG_INFO("proposal action({}) for gpid({}) is invalid, clear all proposal actions: {}",
ss.str(),
action,
gpid,
reason);
action.type = config_type::CT_INVALID;
Expand Down Expand Up @@ -240,8 +241,7 @@ pc_status partition_guardian::on_missing_primary(meta_view &view, const dsn::gpi

for (int i = 0; i < pc.hp_secondaries.size(); ++i) {
node_state *ns = get_node_state(*(view.nodes), pc.hp_secondaries[i], false);
CHECK_NOTNULL(
ns, "invalid secondary address, address = {}", pc.hp_secondaries[i].to_string());
CHECK_NOTNULL(ns, "invalid secondary address, address = {}", pc.hp_secondaries[i]);
if (!ns->alive())
continue;

Expand Down Expand Up @@ -367,10 +367,10 @@ pc_status partition_guardian::on_missing_primary(meta_view &view, const dsn::gpi
if (ns == nullptr || !ns->alive()) {
ready = false;
reason =
"the last dropped node(" + nodes[i].to_string() + ") haven't come back yet";
fmt::format("the last dropped node({}) haven't come back yet", nodes[i]);
LOG_WARNING("{}: don't select primary: {}", gpid_name, reason);
} else {
std::vector<dropped_replica>::iterator it = cc.find_from_dropped(nodes[i]);
const auto &it = cc.find_from_dropped(nodes[i]);
if (it == cc.dropped.end() || it->ballot == invalid_ballot) {
if (ns->has_collected()) {
LOG_INFO("{}: ignore {}'s replica info as it doesn't exist on "
Expand All @@ -380,8 +380,8 @@ pc_status partition_guardian::on_missing_primary(meta_view &view, const dsn::gpi
collected_info[i] = {nodes[i], 0, -1, -1, -1};
} else {
ready = false;
reason = "the last dropped node(" + nodes[i].to_string() +
") is unavailable because ";
reason = fmt::format(
"the last dropped node({}) is unavailable because ", nodes[i]);
if (it == cc.dropped.end()) {
reason += "the node is not exist in dropped_nodes";
} else {
Expand Down Expand Up @@ -625,9 +625,7 @@ pc_status partition_guardian::on_missing_secondary(meta_view &view, const dsn::g
// if not emergency, only try to recover last dropped server
const dropped_replica &server = cc.dropped.back();
if (is_node_alive(*view.nodes, server.node)) {
CHECK(!server.node.is_invalid(),
"invalid server address, address = {}",
server.node.to_string());
CHECK(!server.node.is_invalid(), "invalid server address, address = {}", server.node);
action.hp_node = server.node;
action.node = _svc->get_dns_resolver()->resolve_address(server.node);
}
Expand Down
Loading

0 comments on commit 62ef062

Please sign in to comment.