Skip to content

Commit

Permalink
fix macro for nfs & failure_detector
Browse files Browse the repository at this point in the history
  • Loading branch information
GehaFearless committed Jun 20, 2023
1 parent ab9c746 commit 1a7dc17
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 142 deletions.
78 changes: 40 additions & 38 deletions src/failure_detector/failure_detector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,36 +356,31 @@ void failure_detector::on_ping_internal(const beacon_msg &beacon, /*out*/ beacon
ack.time = beacon.time;
ack.this_node = beacon.to_addr;
ack.primary_node = dsn_primary_address();
ack.__set_hp_this_node(beacon.hp_to_addr);
ack.__set_hp_primary_node(dsn_primary_host_port());
ack.is_master = true;
ack.allowed = true;

zauto_lock l(_lock);

uint64_t now = dsn_now_ms();

host_port node;
if (beacon.__isset.host_port_from) {
node = beacon.host_port_from;
} else {
node = host_port(beacon.from_addr);
}

worker_map::iterator itr = _workers.find(node);
worker_map::iterator itr = _workers.find(beacon.hp_from_addr);
if (itr == _workers.end()) {
// if is a new worker, check allow list first if need
if (_use_allow_list && _allow_list.find(node) == _allow_list.end()) {
LOG_WARNING("new worker[{}] is rejected", node);
if (_use_allow_list && _allow_list.find(beacon.hp_from_addr) == _allow_list.end()) {
LOG_WARNING("new worker[{}] is rejected", beacon.hp_from_addr);
ack.allowed = false;
return;
}

// create new entry for node
worker_record record(node, now);
worker_record record(beacon.hp_from_addr, now);
record.is_alive = true;
_workers.insert(std::make_pair(node, record));
_workers.insert(std::make_pair(beacon.hp_from_addr, record));

report(node, false, true);
on_worker_connected(node);
report(beacon.hp_from_addr, false, true);
on_worker_connected(beacon.hp_from_addr);
} else if (is_time_greater_than(now, itr->second.last_beacon_recv_time)) {
// update last_beacon_recv_time
itr->second.last_beacon_recv_time = now;
Expand All @@ -397,8 +392,8 @@ void failure_detector::on_ping_internal(const beacon_msg &beacon, /*out*/ beacon
if (itr->second.is_alive == false) {
itr->second.is_alive = true;

report(node, false, true);
on_worker_connected(node);
report(beacon.hp_from_addr, false, true);
on_worker_connected(beacon.hp_from_addr);
}
} else {
LOG_INFO("now[{}] <= last_recv_time[{}]", now, itr->second.last_beacon_recv_time);
Expand All @@ -407,8 +402,12 @@ void failure_detector::on_ping_internal(const beacon_msg &beacon, /*out*/ beacon

void failure_detector::on_ping(const beacon_msg &beacon, ::dsn::rpc_replier<beacon_ack> &reply)
{
beacon_msg msg = beacon;
FILL_HP_OPTIONAL_SECTION(msg, from_addr);
FILL_HP_OPTIONAL_SECTION(msg, to_addr);

beacon_ack ack;
on_ping_internal(beacon, ack);
on_ping_internal(msg, ack);
reply(ack);
}

Expand All @@ -422,37 +421,38 @@ bool failure_detector::end_ping_internal(::dsn::error_code err, const beacon_ack
/*
* the caller of the end_ping_internal should lock necessarily!!!
*/
uint64_t beacon_send_time = ack.time;
host_port node;
if (ack.__isset.host_port_this_node) {
node = ack.host_port_this_node;
} else {
node = host_port(ack.this_node);
}
beacon_ack ack_msg = ack;
FILL_HP_OPTIONAL_SECTION(ack_msg, this_node);
FILL_HP_OPTIONAL_SECTION(ack_msg, primary_node);

uint64_t beacon_send_time = ack_msg.time;

if (err != ERR_OK) {
LOG_WARNING("ping master({}) failed, timeout_ms = {}, err = {}",
node,
ack_msg.hp_this_node,
_beacon_timeout_milliseconds,
err);
_recent_beacon_fail_count->increment();
}

master_map::iterator itr = _masters.find(node);
master_map::iterator itr = _masters.find(ack_msg.hp_this_node);

if (itr == _masters.end()) {
LOG_WARNING("received beacon ack without corresponding master, ignore it, "
"remote_master[{}], local_worker[{}]",
node,
"remote_master[{}({})], local_worker[{}]",
ack_msg.hp_this_node,
ack_msg.this_node,
dsn_primary_address());
return false;
}

master_record &record = itr->second;
if (!ack.allowed) {
if (!ack_msg.allowed) {
LOG_WARNING(
"worker rejected, stop sending beacon message, remote_master[{}], local_worker[{}]",
node,
"worker rejected, stop sending beacon message, remote_master[{}({})], local_worker[{}({})]",
ack_msg.hp_this_node,
ack_msg.this_node,
dsn_primary_host_port(),
dsn_primary_address());
record.rejected = true;
record.send_beacon_timer->cancel(true);
Expand All @@ -473,9 +473,9 @@ bool failure_detector::end_ping_internal(::dsn::error_code err, const beacon_ack
}

// if ack is not from master meta, worker should not update its last send time
if (!ack.is_master) {
if (!ack_msg.is_master) {
LOG_WARNING(
"node[{}] is not master, ack.primary_node[{}] is real master", node, ack.primary_node);
"node[{}({})] is not master, ack.primary_node[{}({})] is real master", ack_msg.hp_this_node, ack_msg.this_node, ack_msg.primary_node, ack_msg.hp_primary_node);
return true;
}

Expand All @@ -492,9 +492,9 @@ bool failure_detector::end_ping_internal(::dsn::error_code err, const beacon_ack
if (!record.is_alive && is_time_greater_than(now, record.last_send_time_for_beacon_with_ack) &&
now - record.last_send_time_for_beacon_with_ack <= _lease_milliseconds) {
// report master connected
report(node, true, true);
report(ack_msg.hp_this_node, true, true);
itr->second.is_alive = true;
on_master_connected(node);
on_master_connected(ack_msg.hp_this_node);
}

return true;
Expand Down Expand Up @@ -585,12 +585,12 @@ void failure_detector::send_beacon(::dsn::host_port target, uint64_t time)
beacon.time = time;
beacon.from_addr = dsn_primary_address();
beacon.to_addr = addr;
beacon.__set_host_port_from(dsn_primary_host_port());
beacon.__set_host_port_to(target);
beacon.__set_hp_from_addr(dsn_primary_host_port());
beacon.__set_hp_to_addr(target);
beacon.__set_start_time(static_cast<int64_t>(dsn::utils::process_start_millis()));

LOG_INFO(
"send ping message, from[{}], to[{}], time[{}]", beacon.from_addr, beacon.to_addr, time);
"send ping message, from[{}({})], to[{}({})], time[{}]", beacon.hp_from_addr, beacon.from_addr, beacon.hp_to_addr, beacon.to_addr, time);

::dsn::rpc::call(addr,
RPC_FD_FAILURE_DETECTOR_PING,
Expand All @@ -602,6 +602,8 @@ void failure_detector::send_beacon(::dsn::host_port target, uint64_t time)
ack.time = beacon.time;
ack.this_node = beacon.to_addr;
ack.primary_node.set_invalid();
ack.__set_hp_this_node(beacon.hp_to_addr);
ack.__set_hp_primary_node(host_port());
ack.is_master = false;
ack.allowed = true;
end_ping(err, ack, nullptr);
Expand Down
53 changes: 23 additions & 30 deletions src/failure_detector/failure_detector_multimaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,55 +80,48 @@ void slave_failure_detector_with_multimaster::end_ping(::dsn::error_code err,
const fd::beacon_ack &ack,
void *)
{
host_port this_node_hp, primary_node_hp;
if (ack.__isset.host_port_this_node) {
this_node_hp = ack.host_port_this_node;
} else {
this_node_hp = host_port(ack.this_node);
}
if (ack.__isset.host_port_primary_node) {
primary_node_hp = ack.host_port_primary_node;
} else {
primary_node_hp = host_port(ack.primary_node);
}
fd::beacon_ack ack_msg = ack;
FILL_HP_OPTIONAL_SECTION(ack_msg, this_node);
FILL_HP_OPTIONAL_SECTION(ack_msg, primary_node);

LOG_INFO("end ping result, error[{}], time[{}], ack.this_node[{}({})], ack.primary_node[{}({})], "
"ack.is_master[{}], ack.allowed[{}]",
err,
ack.time,
this_node_hp,
ack.this_node,
primary_node_hp,
ack.primary_node,
ack.is_master ? "true" : "false",
ack.allowed ? "true" : "false");
ack_msg.time,
ack_msg.hp_this_node,
ack_msg.this_node,
ack_msg.hp_primary_node,
ack_msg.primary_node,
ack_msg.is_master ? "true" : "false",
ack_msg.allowed ? "true" : "false");

zauto_lock l(failure_detector::_lock);
if (!failure_detector::end_ping_internal(err, ack))
if (!failure_detector::end_ping_internal(err, ack_msg))
return;

CHECK_EQ(this_node_hp, _meta_servers.group_host_port()->leader());
CHECK_EQ(ack_msg.hp_this_node, _meta_servers.group_host_port()->leader());

if (ERR_OK != err) {
host_port next = _meta_servers.group_host_port()->next(this_node_hp);
if (next != this_node_hp) {
host_port next = _meta_servers.group_host_port()->next(ack_msg.hp_this_node);
if (next != ack_msg.hp_this_node) {
_meta_servers.group_host_port()->set_leader(next);
// do not start next send_beacon() immediately to avoid send rpc too frequently
switch_master(this_node_hp, next, 1000);
switch_master(ack_msg.hp_this_node, next, 1000);
}
} else {
if (ack.is_master) {
if (ack_msg.is_master) {
// do nothing
} else if (ack.primary_node.is_invalid()) {
host_port next = _meta_servers.group_host_port()->next(this_node_hp);
if (next != this_node_hp) {
} else if (ack_msg.primary_node.is_invalid()) {
host_port next = _meta_servers.group_host_port()->next(ack_msg.hp_this_node);
if (next != ack_msg.hp_this_node) {
_meta_servers.group_host_port()->set_leader(next);
// do not start next send_beacon() immediately to avoid send rpc too frequently
switch_master(this_node_hp, next, 1000);
switch_master(ack_msg.hp_this_node, next, 1000);
}
} else {
_meta_servers.group_host_port()->set_leader(primary_node_hp);
_meta_servers.group_host_port()->set_leader(ack_msg.hp_primary_node);
// start next send_beacon() immediately because the leader is possibly right.
switch_master(this_node_hp, primary_node_hp, 0);
switch_master(ack_msg.hp_this_node, ack_msg.hp_primary_node, 0);
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions src/failure_detector/fd.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ struct beacon_msg
2: dsn.rpc_address from_addr;
3: dsn.rpc_address to_addr;
4: optional i64 start_time;
5: optional dsn.host_port host_port_from;
6: optional dsn.host_port host_port_to;
5: optional dsn.host_port hp_from_addr;
6: optional dsn.host_port hp_to_addr;
}

struct beacon_ack
Expand All @@ -45,13 +45,13 @@ struct beacon_ack
3: dsn.rpc_address primary_node;
4: bool is_master;
5: bool allowed;
6: optional dsn.host_port host_port_this_node;
7: optional dsn.host_port host_port_primary_node;
6: optional dsn.host_port hp_this_node;
7: optional dsn.host_port hp_primary_node;
}

struct config_master_message
{
1: dsn.rpc_address master;
2: bool is_register;
3: optional dsn.host_port host_port_master;
3: optional dsn.host_port hp_master;
}
18 changes: 9 additions & 9 deletions src/failure_detector/test/failure_detector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ class test_worker : public service_app, public serverlet<test_worker>
request.is_register ? "reg" : "unreg");

host_port master;
if (request.__isset.host_port_master) {
master = request.host_port_master;
if (request.__isset.hp_master) {
master = request.hp_master;
} else {
master = host_port(request.master);
}
Expand Down Expand Up @@ -333,7 +333,7 @@ void worker_set_leader(test_worker *worker, int leader_contact)
config_master_message msg;
msg.master = rpc_address("localhost", MPORT_START + leader_contact);
msg.is_register = true;
msg.__set_host_port_master(host_port(msg.master));
msg.__set_hp_master(host_port(msg.master));
error_code err;
bool response;
std::tie(err, response) = rpc::call_wait<bool>(
Expand All @@ -349,7 +349,7 @@ void clear(test_worker *worker, std::vector<test_master *> masters)
config_master_message msg;
msg.master = leader;
msg.is_register = false;
msg.__set_host_port_master(hp);
msg.__set_hp_master(hp);
error_code err;
bool response;
std::tie(err, response) = rpc::call_wait<bool>(
Expand Down Expand Up @@ -671,16 +671,16 @@ TEST(fd, update_stability)
msg.time = dsn_now_ms();
msg.__isset.start_time = true;
msg.start_time = 1000;
msg.__set_host_port_from(host_port("localhost", 123));
msg.__set_host_port_to(host_port("localhost", MPORT_START));
msg.__set_hp_from_addr(host_port("localhost", 123));
msg.__set_hp_to_addr(host_port("localhost", MPORT_START));

// first on ping
fd->on_ping(msg, r);
ASSERT_EQ(1, smap->size());
ASSERT_NE(smap->end(), smap->find(msg.host_port_from));
ASSERT_NE(smap->end(), smap->find(msg.hp_from_addr));

replication::meta_server_failure_detector::worker_stability &ws =
smap->find(msg.host_port_from)->second;
smap->find(msg.hp_from_addr)->second;
ASSERT_EQ(0, ws.unstable_restart_count);
ASSERT_EQ(msg.start_time, ws.last_start_time_ms);
ASSERT_TRUE(r.is_empty());
Expand Down Expand Up @@ -748,7 +748,7 @@ TEST(fd, update_stability)
ASSERT_FALSE(r.is_empty());

// reset stat
fd->reset_stability_stat(msg.host_port_from);
fd->reset_stability_stat(msg.hp_from_addr);
ASSERT_EQ(msg.start_time, ws.last_start_time_ms);
ASSERT_EQ(0, ws.unstable_restart_count);
}
Expand Down
Loading

0 comments on commit 1a7dc17

Please sign in to comment.