Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Feb 4, 2024
1 parent 47ca98c commit 555447a
Show file tree
Hide file tree
Showing 16 changed files with 85 additions and 84 deletions.
2 changes: 1 addition & 1 deletion idl/partition_split.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ struct register_child_request
1:dsn.layer2.app_info app;
2:dsn.layer2.partition_configuration parent_config;
3:dsn.layer2.partition_configuration child_config;
4:dsn.rpc_address primary_address;
4:dsn.rpc_address primary;
5:optional dsn.host_port hp_primary;
}

Expand Down
2 changes: 1 addition & 1 deletion src/client/replication_ddl_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ dsn::error_code replication_ddl_client::do_recovery(const std::vector<host_port>
for (const auto &node : replica_nodes) {
if (std::find(req->hp_recovery_set.begin(), req->hp_recovery_set.end(), node) !=
req->hp_recovery_set.end()) {
out << "duplicate replica node " << node.to_string() << ", just ingore it" << std::endl;
out << "duplicate replica node " << node << ", just ingore it" << std::endl;
} else {
req->hp_recovery_set.push_back(node);
req->recovery_set.push_back(_dns_resolver->resolve_address(node));
Expand Down
4 changes: 2 additions & 2 deletions src/common/consensus.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,10 @@ struct learn_response
4:i64 prepare_start_decree; // prepare start decree
5:learn_type type = learn_type.LT_INVALID; // learning type: CACHE, LOG, APP
6:learn_state state; // learning data, including memory data and files
7:dsn.rpc_address address; // learnee's address
7:dsn.rpc_address learnee; // learnee's address
8:string base_local_dir; // base dir of files on learnee
9:optional string replica_disk_tag; // the disk tag of learnee located
10:optional dsn.host_port hp_address; // learnee's address
10:optional dsn.host_port hp_learnee; // learnee's host_port
}

struct learn_notify_response
Expand Down
48 changes: 24 additions & 24 deletions src/failure_detector/failure_detector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,13 +354,13 @@ std::string failure_detector::get_allow_list(const std::vector<std::string> &arg

void failure_detector::on_ping_internal(const beacon_msg &beacon, /*out*/ beacon_ack &ack)
{
host_port hp_from_addr, hp_to_addr;
GET_HOST_PORT(beacon, from_addr, hp_from_addr);
GET_HOST_PORT(beacon, to_addr, hp_to_addr);
host_port hp_from_node, hp_to_node;
GET_HOST_PORT(beacon, from_node, hp_from_node);
GET_HOST_PORT(beacon, to_node, hp_to_node);

ack.time = beacon.time;
ack.this_node = beacon.to_addr;
ack.__set_hp_this_node(hp_to_addr);
ack.this_node = beacon.to_node;
ack.__set_hp_this_node(hp_to_node);
ack.primary_node = dsn_primary_address();
ack.__set_hp_primary_node(dsn_primary_host_port());
ack.is_master = true;
Expand All @@ -370,22 +370,22 @@ void failure_detector::on_ping_internal(const beacon_msg &beacon, /*out*/ beacon

uint64_t now = dsn_now_ms();

worker_map::iterator itr = _workers.find(hp_from_addr);
worker_map::iterator itr = _workers.find(hp_from_node);
if (itr == _workers.end()) {
// if is a new worker, check allow list first if need
if (_use_allow_list && _allow_list.find(hp_from_addr) == _allow_list.end()) {
LOG_WARNING("new worker[{}] is rejected", hp_from_addr);
if (_use_allow_list && _allow_list.find(hp_from_node) == _allow_list.end()) {
LOG_WARNING("new worker[{}] is rejected", hp_from_node);
ack.allowed = false;
return;
}

// create new entry for node
worker_record record(hp_from_addr, now);
worker_record record(hp_from_node, now);
record.is_alive = true;
_workers.insert(std::make_pair(hp_from_addr, record));
_workers.insert(std::make_pair(hp_from_node, record));

report(hp_from_addr, false, true);
on_worker_connected(hp_from_addr);
report(hp_from_node, false, true);
on_worker_connected(hp_from_node);
} else if (is_time_greater_than(now, itr->second.last_beacon_recv_time)) {
// update last_beacon_recv_time
itr->second.last_beacon_recv_time = now;
Expand All @@ -397,8 +397,8 @@ void failure_detector::on_ping_internal(const beacon_msg &beacon, /*out*/ beacon
if (itr->second.is_alive == false) {
itr->second.is_alive = true;

report(hp_from_addr, false, true);
on_worker_connected(hp_from_addr);
report(hp_from_node, false, true);
on_worker_connected(hp_from_node);
}
} else {
LOG_INFO("now[{}] <= last_recv_time[{}]", now, itr->second.last_beacon_recv_time);
Expand Down Expand Up @@ -588,17 +588,17 @@ void failure_detector::send_beacon(::dsn::host_port target, uint64_t time)
const auto &addr = _dns_resolver->resolve_address(target);
beacon_msg beacon;
beacon.time = time;
beacon.from_addr = dsn_primary_address();
beacon.__set_hp_from_addr(dsn_primary_host_port());
beacon.to_addr = addr;
beacon.__set_hp_to_addr(target);
beacon.from_node = dsn_primary_address();
beacon.__set_hp_from_node(dsn_primary_host_port());
beacon.to_node = addr;
beacon.__set_hp_to_node(target);
beacon.__set_start_time(static_cast<int64_t>(dsn::utils::process_start_millis()));

LOG_INFO("send ping message, from[{}({})], to[{}({})], time[{}]",
beacon.hp_from_addr,
beacon.from_addr,
beacon.hp_to_addr,
beacon.to_addr,
beacon.hp_from_node,
beacon.from_node,
beacon.hp_to_node,
beacon.to_node,
time);

::dsn::rpc::call(addr,
Expand All @@ -609,9 +609,9 @@ void failure_detector::send_beacon(::dsn::host_port target, uint64_t time)
if (err != ::dsn::ERR_OK) {
beacon_ack ack;
ack.time = beacon.time;
ack.this_node = beacon.to_addr;
ack.this_node = beacon.to_node;
ack.primary_node.set_invalid();
ack.__set_hp_this_node(beacon.hp_to_addr);
ack.__set_hp_this_node(beacon.hp_to_node);
ack.__set_hp_primary_node(host_port());
ack.is_master = false;
ack.allowed = true;
Expand Down
8 changes: 4 additions & 4 deletions src/failure_detector/fd.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ namespace cpp dsn.fd
struct beacon_msg
{
1: i64 time;
2: dsn.rpc_address from_addr;
3: dsn.rpc_address to_addr;
2: dsn.rpc_address from_node;
3: dsn.rpc_address to_node;
4: optional i64 start_time;
5: optional dsn.host_port hp_from_addr;
6: optional dsn.host_port hp_to_addr;
5: optional dsn.host_port hp_from_node;
6: optional dsn.host_port hp_to_node;
}

struct beacon_ack
Expand Down
18 changes: 9 additions & 9 deletions src/failure_detector/test/failure_detector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ class master_fd_test : public replication::meta_server_failure_detector
else {
LOG_DEBUG("ignore on ping, beacon msg, time[{}], from[{}], to[{}]",
beacon.time,
beacon.from_addr,
beacon.to_addr);
beacon.from_node,
beacon.to_node);
}
}

Expand Down Expand Up @@ -663,21 +663,21 @@ TEST(fd, update_stability)

dsn::rpc_replier<beacon_ack> r(create_fake_rpc_response());
beacon_msg msg;
msg.from_addr = rpc_address("localhost", 123);
msg.to_addr = rpc_address("localhost", MPORT_START);
msg.from_node = rpc_address("localhost", 123);
msg.to_node = rpc_address("localhost", MPORT_START);
msg.time = dsn_now_ms();
msg.__isset.start_time = true;
msg.start_time = 1000;
msg.__set_hp_from_addr(host_port("localhost", 123));
msg.__set_hp_to_addr(host_port("localhost", MPORT_START));
msg.__set_hp_from_node(host_port("localhost", 123));
msg.__set_hp_to_node(host_port("localhost", MPORT_START));

// first on ping
fd->on_ping(msg, r);
ASSERT_EQ(1, smap->size());
ASSERT_NE(smap->end(), smap->find(msg.hp_from_addr));
ASSERT_NE(smap->end(), smap->find(msg.hp_from_node));

replication::meta_server_failure_detector::worker_stability &ws =
smap->find(msg.hp_from_addr)->second;
smap->find(msg.hp_from_node)->second;
ASSERT_EQ(0, ws.unstable_restart_count);
ASSERT_EQ(msg.start_time, ws.last_start_time_ms);
ASSERT_TRUE(r.is_empty());
Expand Down Expand Up @@ -745,7 +745,7 @@ TEST(fd, update_stability)
ASSERT_FALSE(r.is_empty());

// reset stat
fd->reset_stability_stat(msg.hp_from_addr);
fd->reset_stability_stat(msg.hp_from_node);
ASSERT_EQ(msg.start_time, ws.last_start_time_ms);
ASSERT_EQ(0, ws.unstable_restart_count);
}
Expand Down
56 changes: 28 additions & 28 deletions src/meta/meta_server_failure_detector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,46 +227,46 @@ bool meta_server_failure_detector::update_stability_stat(const fd::beacon_msg &b
{
zauto_lock l(_map_lock);

host_port hp_from_addr;
GET_HOST_PORT(beacon, from_addr, hp_from_addr);
host_port hp_from_node;
GET_HOST_PORT(beacon, from_node, hp_from_node);

auto iter = _stablity.find(hp_from_addr);
auto iter = _stablity.find(hp_from_node);
if (iter == _stablity.end()) {
_stablity.emplace(hp_from_addr, worker_stability{beacon.start_time, 0});
_stablity.emplace(hp_from_node, worker_stability{beacon.start_time, 0});
return true;
} else {
worker_stability &w = iter->second;
if (beacon.start_time == w.last_start_time_ms) {
LOG_DEBUG("{}({}) isn't restarted, last_start_time({})",
hp_from_addr,
beacon.from_addr,
hp_from_node,
beacon.from_node,
w.last_start_time_ms);
if (dsn_now_ms() - w.last_start_time_ms >= FLAGS_stable_rs_min_running_seconds * 1000 &&
w.unstable_restart_count > 0) {
LOG_INFO("{}({}) has stably run for a while, reset it's unstable count({}) to 0",
hp_from_addr,
beacon.from_addr,
hp_from_node,
beacon.from_node,
w.unstable_restart_count);
w.unstable_restart_count = 0;
}
} else if (beacon.start_time > w.last_start_time_ms) {
LOG_INFO("check {}({}) restarted, last_time({}), this_time({})",
hp_from_addr,
beacon.from_addr,
hp_from_node,
beacon.from_node,
w.last_start_time_ms,
beacon.start_time);
if (beacon.start_time - w.last_start_time_ms <
FLAGS_stable_rs_min_running_seconds * 1000) {
w.unstable_restart_count++;
LOG_WARNING("{}({}) encounter an unstable restart, total_count({})",
hp_from_addr,
beacon.from_addr,
hp_from_node,
beacon.from_node,
w.unstable_restart_count);
} else if (w.unstable_restart_count > 0) {
LOG_INFO("{}({}) restart in {} ms after last restart, may recover ok, reset "
"it's unstable count({}) to 0",
hp_from_addr,
beacon.from_addr,
hp_from_node,
beacon.from_node,
beacon.start_time - w.last_start_time_ms,
w.unstable_restart_count);
w.unstable_restart_count = 0;
Expand All @@ -275,8 +275,8 @@ bool meta_server_failure_detector::update_stability_stat(const fd::beacon_msg &b
w.last_start_time_ms = beacon.start_time;
} else {
LOG_WARNING("{}({}): possible encounter a staled message, ignore it",
hp_from_addr,
beacon.from_addr);
hp_from_node,
beacon.from_node);
}
return w.unstable_restart_count < FLAGS_max_succssive_unstable_restart;
}
Expand All @@ -285,21 +285,21 @@ bool meta_server_failure_detector::update_stability_stat(const fd::beacon_msg &b
void meta_server_failure_detector::on_ping(const fd::beacon_msg &beacon,
rpc_replier<fd::beacon_ack> &reply)
{
host_port hp_from_addr, hp_to_addr;
GET_HOST_PORT(beacon, from_addr, hp_from_addr);
GET_HOST_PORT(beacon, to_addr, hp_to_addr);
host_port hp_from_node, hp_to_node;
GET_HOST_PORT(beacon, from_node, hp_from_node);
GET_HOST_PORT(beacon, to_node, hp_to_node);

if (beacon.__isset.start_time && !update_stability_stat(beacon)) {
LOG_WARNING(
"{}({}) is unstable, don't response to it's beacon", beacon.from_addr, hp_from_addr);
"{}({}) is unstable, don't response to it's beacon", beacon.from_node, hp_from_node);
return;
}

fd::beacon_ack ack;
ack.time = beacon.time;
ack.this_node = beacon.to_addr;
ack.this_node = beacon.to_node;
ack.allowed = true;
ack.__set_hp_this_node(hp_to_addr);
ack.__set_hp_this_node(hp_to_node);

dsn::host_port leader;
if (!get_leader(&leader)) {
Expand All @@ -308,19 +308,19 @@ void meta_server_failure_detector::on_ping(const fd::beacon_msg &beacon,
ack.__set_hp_primary_node(leader);
} else {
ack.is_master = true;
ack.primary_node = beacon.to_addr;
ack.__set_hp_primary_node(hp_to_addr);
ack.primary_node = beacon.to_node;
ack.__set_hp_primary_node(hp_to_node);
failure_detector::on_ping_internal(beacon, ack);
}

LOG_INFO("on_ping, beacon send time[{}], is_master({}), from_node({}({})), this_node({}({})), "
"primary_node({}({}))",
ack.time,
ack.is_master ? "true" : "false",
hp_from_addr,
beacon.from_addr,
hp_to_addr,
beacon.to_addr,
hp_from_node,
beacon.from_node,
hp_to_node,
beacon.to_node,
ack.hp_primary_node,
ack.primary_node);

Expand Down
2 changes: 1 addition & 1 deletion src/meta/meta_server_failure_detector.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class meta_server_failure_detector : public fd::failure_detector
virtual void on_ping(const fd::beacon_msg &beacon, rpc_replier<fd::beacon_ack> &reply) override;

private:
// return value: return true if beacon.from_addr is stable; or-else, false
// return value: return true if beacon.from_node is stable; or-else, false
bool update_stability_stat(const fd::beacon_msg &beacon);
void leader_initialize(const std::string &lock_service_owner);

Expand Down
2 changes: 1 addition & 1 deletion src/meta/meta_split_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ void meta_split_service::on_add_child_on_remote_storage_reply(error_code ec,
update_child_request->config = request.child_config;
update_child_request->info = *app;
update_child_request->type = config_type::CT_REGISTER_CHILD;
update_child_request->node = request.primary_address;
update_child_request->node = request.primary;
update_child_request->__set_hp_node(request.hp_primary);

partition_configuration child_config = app->partitions[child_gpid.get_partition_index()];
Expand Down
7 changes: 4 additions & 3 deletions src/replica/duplication/replica_follower.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,11 @@ error_code replica_follower::nfs_copy_checkpoint(error_code err, learn_response
return ERR_FILE_OPERATION_FAILED;
}

host_port hp;
GET_HOST_PORT(resp, address, hp);
host_port hp_learnee;
GET_HOST_PORT(resp, learnee, hp_learnee);

nfs_copy_remote_files(hp, resp.replica_disk_tag, resp.base_local_dir, resp.state.files, dest);
nfs_copy_remote_files(
hp_learnee, resp.replica_disk_tag, resp.base_local_dir, resp.state.files, dest);
return ERR_OK;
}

Expand Down
4 changes: 2 additions & 2 deletions src/replica/duplication/test/replica_follower_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ TEST_P(replica_follower_test, test_nfs_copy_checkpoint)
ASSERT_EQ(nfs_copy_checkpoint(follower, ERR_CORRUPTION, learn_response()), ERR_CORRUPTION);

auto resp = learn_response();
resp.address = rpc_address("127.0.0.1", 34801);
resp.__set_hp_address(host_port("localhost", 34801));
resp.learnee = rpc_address("127.0.0.1", 34801);
resp.__set_hp_learnee(host_port("localhost", 34801));

std::string dest = utils::filesystem::path_combine(
_mock_replica->dir(), duplication_constants::kDuplicationCheckpointRootDir);
Expand Down
4 changes: 2 additions & 2 deletions src/replica/replica_chkpt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@ void replica::on_query_last_checkpoint(/*out*/ learn_response &response)
// for example: base_local_dir = "./data" + "checkpoint.1024" = "./data/checkpoint.1024"
response.base_local_dir = utils::filesystem::path_combine(
_app->data_dir(), checkpoint_folder(response.state.to_decree_included));
response.address = _stub->primary_address();
response.__set_hp_address(_stub->primary_host_port());
response.learnee = _stub->primary_address();
response.__set_hp_learnee(_stub->primary_host_port());
for (auto &file : response.state.files) {
// response.state.files contain file absolute path, for example:
// "./data/checkpoint.1024/1.sst" use `substr` to get the file name: 1.sst
Expand Down
4 changes: 2 additions & 2 deletions src/replica/replica_learn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,8 @@ void replica::on_learn(dsn::message_ex *msg, const learn_request &request)
_prepare_list->count(),
learn_start_decree);

response.address = _stub->primary_address();
response.__set_hp_address(_stub->primary_host_port());
response.learnee = _stub->primary_address();
response.__set_hp_learnee(_stub->primary_host_port());
response.prepare_start_decree = invalid_decree;
response.last_committed_decree = local_committed_decree;
response.err = ERR_OK;
Expand Down
Loading

0 comments on commit 555447a

Please sign in to comment.