From 1a7dc177096ed1aba3c63bf33abbcdee5db525a3 Mon Sep 17 00:00:00 2001 From: liguohao <948193394@qq.com> Date: Tue, 20 Jun 2023 21:22:19 +0800 Subject: [PATCH] fix macro for nfs & failure_detector --- src/failure_detector/failure_detector.cpp | 78 ++++++++++--------- .../failure_detector_multimaster.cpp | 53 ++++++------- src/failure_detector/fd.thrift | 10 +-- .../test/failure_detector.cpp | 18 ++--- src/meta/meta_server_failure_detector.cpp | 65 ++++++---------- src/nfs/nfs.thrift | 4 +- src/nfs/nfs_client_impl.cpp | 22 ++---- 7 files changed, 108 insertions(+), 142 deletions(-) diff --git a/src/failure_detector/failure_detector.cpp b/src/failure_detector/failure_detector.cpp index 45737b287a..70a8600601 100644 --- a/src/failure_detector/failure_detector.cpp +++ b/src/failure_detector/failure_detector.cpp @@ -356,6 +356,8 @@ void failure_detector::on_ping_internal(const beacon_msg &beacon, /*out*/ beacon ack.time = beacon.time; ack.this_node = beacon.to_addr; ack.primary_node = dsn_primary_address(); + ack.__set_hp_this_node(beacon.hp_to_addr); + ack.__set_hp_primary_node(dsn_primary_host_port()); ack.is_master = true; ack.allowed = true; @@ -363,29 +365,22 @@ void failure_detector::on_ping_internal(const beacon_msg &beacon, /*out*/ beacon uint64_t now = dsn_now_ms(); - host_port node; - if (beacon.__isset.host_port_from) { - node = beacon.host_port_from; - } else { - node = host_port(beacon.from_addr); - } - - worker_map::iterator itr = _workers.find(node); + worker_map::iterator itr = _workers.find(beacon.hp_from_addr); if (itr == _workers.end()) { // if is a new worker, check allow list first if need - if (_use_allow_list && _allow_list.find(node) == _allow_list.end()) { - LOG_WARNING("new worker[{}] is rejected", node); + if (_use_allow_list && _allow_list.find(beacon.hp_from_addr) == _allow_list.end()) { + LOG_WARNING("new worker[{}] is rejected", beacon.hp_from_addr); ack.allowed = false; return; } // create new entry for node - worker_record record(node, now); + worker_record record(beacon.hp_from_addr, now); record.is_alive = true; - _workers.insert(std::make_pair(node, record)); + _workers.insert(std::make_pair(beacon.hp_from_addr, record)); - report(node, false, true); - on_worker_connected(node); + report(beacon.hp_from_addr, false, true); + on_worker_connected(beacon.hp_from_addr); } else if (is_time_greater_than(now, itr->second.last_beacon_recv_time)) { // update last_beacon_recv_time itr->second.last_beacon_recv_time = now; @@ -397,8 +392,8 @@ void failure_detector::on_ping_internal(const beacon_msg &beacon, /*out*/ beacon if (itr->second.is_alive == false) { itr->second.is_alive = true; - report(node, false, true); - on_worker_connected(node); + report(beacon.hp_from_addr, false, true); + on_worker_connected(beacon.hp_from_addr); } } else { LOG_INFO("now[{}] <= last_recv_time[{}]", now, itr->second.last_beacon_recv_time); @@ -407,8 +402,12 @@ void failure_detector::on_ping_internal(const beacon_msg &beacon, /*out*/ beacon void failure_detector::on_ping(const beacon_msg &beacon, ::dsn::rpc_replier &reply) { + beacon_msg msg = beacon; + FILL_HP_OPTIONAL_SECTION(msg, from_addr); + FILL_HP_OPTIONAL_SECTION(msg, to_addr); + beacon_ack ack; - on_ping_internal(beacon, ack); + on_ping_internal(msg, ack); reply(ack); } @@ -422,37 +421,38 @@ bool failure_detector::end_ping_internal(::dsn::error_code err, const beacon_ack /* * the caller of the end_ping_internal should lock necessarily!!! */ - uint64_t beacon_send_time = ack.time; - host_port node; - if (ack.__isset.host_port_this_node) { - node = ack.host_port_this_node; - } else { - node = host_port(ack.this_node); - } + beacon_ack ack_msg = ack; + FILL_HP_OPTIONAL_SECTION(ack_msg, this_node); + FILL_HP_OPTIONAL_SECTION(ack_msg, primary_node); + + uint64_t beacon_send_time = ack_msg.time; if (err != ERR_OK) { LOG_WARNING("ping master({}) failed, timeout_ms = {}, err = {}", - node, + ack_msg.hp_this_node, _beacon_timeout_milliseconds, err); _recent_beacon_fail_count->increment(); } - master_map::iterator itr = _masters.find(node); + master_map::iterator itr = _masters.find(ack_msg.hp_this_node); if (itr == _masters.end()) { LOG_WARNING("received beacon ack without corresponding master, ignore it, " - "remote_master[{}], local_worker[{}]", - node, + "remote_master[{}({})], local_worker[{}]", + ack_msg.hp_this_node, + ack_msg.this_node, dsn_primary_address()); return false; } master_record &record = itr->second; - if (!ack.allowed) { + if (!ack_msg.allowed) { LOG_WARNING( - "worker rejected, stop sending beacon message, remote_master[{}], local_worker[{}]", - node, + "worker rejected, stop sending beacon message, remote_master[{}({})], local_worker[{}({})]", + ack_msg.hp_this_node, + ack_msg.this_node, + dsn_primary_host_port(), dsn_primary_address()); record.rejected = true; record.send_beacon_timer->cancel(true); @@ -473,9 +473,9 @@ bool failure_detector::end_ping_internal(::dsn::error_code err, const beacon_ack } // if ack is not from master meta, worker should not update its last send time - if (!ack.is_master) { + if (!ack_msg.is_master) { LOG_WARNING( - "node[{}] is not master, ack.primary_node[{}] is real master", node, ack.primary_node); + "node[{}({})] is not master, ack.primary_node[{}({})] is real master", ack_msg.hp_this_node, ack_msg.this_node, ack_msg.primary_node, ack_msg.hp_primary_node); return true; } @@ -492,9 +492,9 @@ bool failure_detector::end_ping_internal(::dsn::error_code err, const beacon_ack if (!record.is_alive && is_time_greater_than(now, record.last_send_time_for_beacon_with_ack) && now - record.last_send_time_for_beacon_with_ack <= _lease_milliseconds) { // report master connected - report(node, true, true); + report(ack_msg.hp_this_node, true, true); itr->second.is_alive = true; - on_master_connected(node); + on_master_connected(ack_msg.hp_this_node); } return true; @@ -585,12 +585,12 @@ void failure_detector::send_beacon(::dsn::host_port target, uint64_t time) beacon.time = time; beacon.from_addr = dsn_primary_address(); beacon.to_addr = addr; - beacon.__set_host_port_from(dsn_primary_host_port()); - beacon.__set_host_port_to(target); + beacon.__set_hp_from_addr(dsn_primary_host_port()); + beacon.__set_hp_to_addr(target); beacon.__set_start_time(static_cast(dsn::utils::process_start_millis())); LOG_INFO( - "send ping message, from[{}], to[{}], time[{}]", beacon.from_addr, beacon.to_addr, time); + "send ping message, from[{}({})], to[{}({})], time[{}]", beacon.hp_from_addr, beacon.from_addr, beacon.hp_to_addr, beacon.to_addr, time); ::dsn::rpc::call(addr, RPC_FD_FAILURE_DETECTOR_PING, @@ -602,6 +602,8 @@ void failure_detector::send_beacon(::dsn::host_port target, uint64_t time) ack.time = beacon.time; ack.this_node = beacon.to_addr; ack.primary_node.set_invalid(); + ack.__set_hp_this_node(beacon.hp_to_addr); + ack.__set_hp_primary_node(host_port()); ack.is_master = false; ack.allowed = true; end_ping(err, ack, nullptr); diff --git a/src/failure_detector/failure_detector_multimaster.cpp b/src/failure_detector/failure_detector_multimaster.cpp index 7c2202b29f..c2d34c1468 100644 --- a/src/failure_detector/failure_detector_multimaster.cpp +++ b/src/failure_detector/failure_detector_multimaster.cpp @@ -80,55 +80,48 @@ void slave_failure_detector_with_multimaster::end_ping(::dsn::error_code err, const fd::beacon_ack &ack, void *) { - host_port this_node_hp, primary_node_hp; - if (ack.__isset.host_port_this_node) { - this_node_hp = ack.host_port_this_node; - } else { - this_node_hp = host_port(ack.this_node); - } - if (ack.__isset.host_port_primary_node) { - primary_node_hp = ack.host_port_primary_node; - } else { - primary_node_hp = host_port(ack.primary_node); - } + fd::beacon_ack ack_msg = ack; + FILL_HP_OPTIONAL_SECTION(ack_msg, this_node); + FILL_HP_OPTIONAL_SECTION(ack_msg, primary_node); + LOG_INFO("end ping result, error[{}], time[{}], ack.this_node[{}({})], ack.primary_node[{}({})], " "ack.is_master[{}], ack.allowed[{}]", err, - ack.time, - this_node_hp, - ack.this_node, - primary_node_hp, - ack.primary_node, - ack.is_master ? "true" : "false", - ack.allowed ? "true" : "false"); + ack_msg.time, + ack_msg.hp_this_node, + ack_msg.this_node, + ack_msg.hp_primary_node, + ack_msg.primary_node, + ack_msg.is_master ? "true" : "false", + ack_msg.allowed ? "true" : "false"); zauto_lock l(failure_detector::_lock); - if (!failure_detector::end_ping_internal(err, ack)) + if (!failure_detector::end_ping_internal(err, ack_msg)) return; - CHECK_EQ(this_node_hp, _meta_servers.group_host_port()->leader()); + CHECK_EQ(ack_msg.hp_this_node, _meta_servers.group_host_port()->leader()); if (ERR_OK != err) { - host_port next = _meta_servers.group_host_port()->next(this_node_hp); - if (next != this_node_hp) { + host_port next = _meta_servers.group_host_port()->next(ack_msg.hp_this_node); + if (next != ack_msg.hp_this_node) { _meta_servers.group_host_port()->set_leader(next); // do not start next send_beacon() immediately to avoid send rpc too frequently - switch_master(this_node_hp, next, 1000); + switch_master(ack_msg.hp_this_node, next, 1000); } } else { - if (ack.is_master) { + if (ack_msg.is_master) { // do nothing - } else if (ack.primary_node.is_invalid()) { - host_port next = _meta_servers.group_host_port()->next(this_node_hp); - if (next != this_node_hp) { + } else if (ack_msg.primary_node.is_invalid()) { + host_port next = _meta_servers.group_host_port()->next(ack_msg.hp_this_node); + if (next != ack_msg.hp_this_node) { _meta_servers.group_host_port()->set_leader(next); // do not start next send_beacon() immediately to avoid send rpc too frequently - switch_master(this_node_hp, next, 1000); + switch_master(ack_msg.hp_this_node, next, 1000); } } else { - _meta_servers.group_host_port()->set_leader(primary_node_hp); + _meta_servers.group_host_port()->set_leader(ack_msg.hp_primary_node); // start next send_beacon() immediately because the leader is possibly right. - switch_master(this_node_hp, primary_node_hp, 0); + switch_master(ack_msg.hp_this_node, ack_msg.hp_primary_node, 0); } } } diff --git a/src/failure_detector/fd.thrift b/src/failure_detector/fd.thrift index 96f208dd29..12054106ea 100644 --- a/src/failure_detector/fd.thrift +++ b/src/failure_detector/fd.thrift @@ -34,8 +34,8 @@ struct beacon_msg 2: dsn.rpc_address from_addr; 3: dsn.rpc_address to_addr; 4: optional i64 start_time; - 5: optional dsn.host_port host_port_from; - 6: optional dsn.host_port host_port_to; + 5: optional dsn.host_port hp_from_addr; + 6: optional dsn.host_port hp_to_addr; } struct beacon_ack @@ -45,13 +45,13 @@ struct beacon_ack 3: dsn.rpc_address primary_node; 4: bool is_master; 5: bool allowed; - 6: optional dsn.host_port host_port_this_node; - 7: optional dsn.host_port host_port_primary_node; + 6: optional dsn.host_port hp_this_node; + 7: optional dsn.host_port hp_primary_node; } struct config_master_message { 1: dsn.rpc_address master; 2: bool is_register; - 3: optional dsn.host_port host_port_master; + 3: optional dsn.host_port hp_master; } diff --git a/src/failure_detector/test/failure_detector.cpp b/src/failure_detector/test/failure_detector.cpp index 0fc1facbb7..ab5b8ab4c4 100644 --- a/src/failure_detector/test/failure_detector.cpp +++ b/src/failure_detector/test/failure_detector.cpp @@ -212,8 +212,8 @@ class test_worker : public service_app, public serverlet request.is_register ? "reg" : "unreg"); host_port master; - if (request.__isset.host_port_master) { - master = request.host_port_master; + if (request.__isset.hp_master) { + master = request.hp_master; } else { master = host_port(request.master); } @@ -333,7 +333,7 @@ void worker_set_leader(test_worker *worker, int leader_contact) config_master_message msg; msg.master = rpc_address("localhost", MPORT_START + leader_contact); msg.is_register = true; - msg.__set_host_port_master(host_port(msg.master)); + msg.__set_hp_master(host_port(msg.master)); error_code err; bool response; std::tie(err, response) = rpc::call_wait( @@ -349,7 +349,7 @@ void clear(test_worker *worker, std::vector masters) config_master_message msg; msg.master = leader; msg.is_register = false; - msg.__set_host_port_master(hp); + msg.__set_hp_master(hp); error_code err; bool response; std::tie(err, response) = rpc::call_wait( @@ -671,16 +671,16 @@ TEST(fd, update_stability) msg.time = dsn_now_ms(); msg.__isset.start_time = true; msg.start_time = 1000; - msg.__set_host_port_from(host_port("localhost", 123)); - msg.__set_host_port_to(host_port("localhost", MPORT_START)); + msg.__set_hp_from_addr(host_port("localhost", 123)); + msg.__set_hp_to_addr(host_port("localhost", MPORT_START)); // first on ping fd->on_ping(msg, r); ASSERT_EQ(1, smap->size()); - ASSERT_NE(smap->end(), smap->find(msg.host_port_from)); + ASSERT_NE(smap->end(), smap->find(msg.hp_from_addr)); replication::meta_server_failure_detector::worker_stability &ws = - smap->find(msg.host_port_from)->second; + smap->find(msg.hp_from_addr)->second; ASSERT_EQ(0, ws.unstable_restart_count); ASSERT_EQ(msg.start_time, ws.last_start_time_ms); ASSERT_TRUE(r.is_empty()); @@ -748,7 +748,7 @@ TEST(fd, update_stability) ASSERT_FALSE(r.is_empty()); // reset stat - fd->reset_stability_stat(msg.host_port_from); + fd->reset_stability_stat(msg.hp_from_addr); ASSERT_EQ(msg.start_time, ws.last_start_time_ms); ASSERT_EQ(0, ws.unstable_restart_count); } diff --git a/src/meta/meta_server_failure_detector.cpp b/src/meta/meta_server_failure_detector.cpp index b03a8d45b1..b131b029f3 100644 --- a/src/meta/meta_server_failure_detector.cpp +++ b/src/meta/meta_server_failure_detector.cpp @@ -233,16 +233,13 @@ bool meta_server_failure_detector::update_stability_stat(const fd::beacon_msg &b { zauto_lock l(_map_lock); - host_port node; - if (beacon.__isset.host_port_from) { - node = beacon.host_port_from; - } else { - node = host_port(beacon.from_addr); - } + fd::beacon_msg msg = beacon; + FILL_HP_OPTIONAL_SECTION(msg, from_addr); + FILL_HP_OPTIONAL_SECTION(msg, to_addr); - auto iter = _stablity.find(node); + auto iter = _stablity.find(msg.hp_from_addr); if (iter == _stablity.end()) { - _stablity.emplace(node, worker_stability{beacon.start_time, 0}); + _stablity.emplace(msg.hp_from_addr, worker_stability{msg.start_time, 0}); return true; } else { worker_stability &w = iter->second; @@ -287,58 +284,42 @@ bool meta_server_failure_detector::update_stability_stat(const fd::beacon_msg &b void meta_server_failure_detector::on_ping(const fd::beacon_msg &beacon, rpc_replier &reply) { - if (beacon.__isset.start_time && !update_stability_stat(beacon)) { - LOG_WARNING("{} is unstable, don't response to it's beacon", host_port(beacon.from_addr)); - return; - } + fd::beacon_msg msg = beacon; + FILL_HP_OPTIONAL_SECTION(msg, from_addr); + FILL_HP_OPTIONAL_SECTION(msg, to_addr); - rpc_address this_node_addr; - host_port this_node_hp; - if (beacon.__isset.host_port_to) { - this_node_addr = _dns_resolver->resolve_address(beacon.host_port_to); - this_node_hp = beacon.host_port_to; - } else { - this_node_addr = beacon.to_addr; - this_node_hp = host_port(beacon.to_addr); + if (msg.__isset.start_time && !update_stability_stat(msg)) { + LOG_WARNING("{}({}) is unstable, don't response to it's beacon", msg.from_addr, msg.hp_from_addr); + return; } fd::beacon_ack ack; - ack.time = beacon.time; - ack.this_node = this_node_addr; + ack.time = msg.time; + ack.this_node = msg.to_addr; ack.allowed = true; - ack.__set_host_port_this_node(this_node_hp); + ack.__set_hp_this_node(msg.hp_to_addr); dsn::host_port leader; if (!get_leader(&leader)) { ack.is_master = false; ack.primary_node = _dns_resolver->resolve_address(leader); - ack.__set_host_port_primary_node(leader); + ack.__set_hp_primary_node(leader); } else { ack.is_master = true; - ack.primary_node = this_node_addr; - ack.__set_host_port_primary_node(this_node_hp); - failure_detector::on_ping_internal(beacon, ack); + ack.primary_node = msg.to_addr; + ack.__set_hp_primary_node(msg.hp_to_addr); + failure_detector::on_ping_internal(msg, ack); } - host_port hp_from; - rpc_address addr_form; - if (beacon.__isset.host_port_from) { - hp_from = beacon.host_port_from; - addr_form = _dns_resolver->resolve_address(beacon.host_port_from); - } else { - hp_from = host_port(beacon.from_addr); - addr_form = beacon.to_addr; - } - LOG_INFO("on_ping, beacon send time[{}], is_master({}), from_node({}({})), this_node({}({})), " "primary_node({}({}))", ack.time, ack.is_master ? "true" : "false", - hp_from, - addr_form, - this_node_hp, - this_node_addr, - ack.host_port_primary_node, + msg.hp_from_addr, + msg.from_addr, + msg.hp_to_addr, + msg.to_addr, + ack.hp_primary_node, ack.primary_node); reply(ack); diff --git a/src/nfs/nfs.thrift b/src/nfs/nfs.thrift index c8372fdcec..706e8c6b1e 100644 --- a/src/nfs/nfs.thrift +++ b/src/nfs/nfs.thrift @@ -40,7 +40,7 @@ struct copy_request 8: bool overwrite; 9: optional string source_disk_tag; 10: optional dsn.gpid pid; - 11: optional dsn.host_port source_host_port; + 11: optional dsn.host_port hp_source; } struct copy_response @@ -61,7 +61,7 @@ struct get_file_size_request 6: optional string source_disk_tag; 7: optional string dest_disk_tag; 8: optional dsn.gpid pid; - 9: optional dsn.host_port source_host_port; + 9: optional dsn.host_port hp_source; } struct get_file_size_response diff --git a/src/nfs/nfs_client_impl.cpp b/src/nfs/nfs_client_impl.cpp index 6d36bf7afc..2055806e93 100644 --- a/src/nfs/nfs_client_impl.cpp +++ b/src/nfs/nfs_client_impl.cpp @@ -140,7 +140,7 @@ void nfs_client_impl::begin_remote_copy(std::shared_ptr &rc req->file_size_req.__set_source_disk_tag(rci->source_disk_tag); req->file_size_req.__set_dest_disk_tag(rci->dest_disk_tag); req->file_size_req.__set_pid(rci->pid); - req->file_size_req.__set_source_host_port(rci->source); + req->file_size_req.__set_hp_source(rci->source); req->nfs_task = nfs_task; req->is_finished = false; @@ -156,15 +156,9 @@ void nfs_client_impl::end_get_file_size(::dsn::error_code err, const ::dsn::service::get_file_size_response &resp, const user_request_ptr &ureq) { - host_port hp; - if (ureq->file_size_req.__isset.source_host_port) { - hp = ureq->file_size_req.source_host_port; - } else { - hp = host_port(ureq->file_size_req.source); - } if (err != ::dsn::ERR_OK) { LOG_ERROR("[nfs_service] remote get file size failed, source = {}({}), dir = {}, err = {}", - hp, + ureq->file_size_req.hp_source, ureq->file_size_req.source, ureq->file_size_req.source_dir, err); @@ -175,7 +169,7 @@ void nfs_client_impl::end_get_file_size(::dsn::error_code err, err = dsn::error_code(resp.error); if (err != ::dsn::ERR_OK) { LOG_ERROR("[nfs_service] remote get file size failed, source = {}({}), dir = {}, err = {}", - hp, + ureq->file_size_req.hp_source, ureq->file_size_req.source, ureq->file_size_req.source_dir, err); @@ -306,11 +300,7 @@ void nfs_client_impl::continue_copy() copy_req.is_last = req->is_last; copy_req.__set_source_disk_tag(ureq->file_size_req.source_disk_tag); copy_req.__set_pid(ureq->file_size_req.pid); - if (ureq->file_size_req.__isset.source_host_port) { - copy_req.__set_source_host_port(ureq->file_size_req.source_host_port); - } else { - copy_req.__set_source_host_port(host_port(copy_req.source)); - } + copy_req.__set_hp_source(ureq->file_size_req.__isset.hp_source ? ureq->file_size_req.hp_source : host_port(ureq->file_size_req.source)); req->remote_copy_task = async_nfs_copy(copy_req, [=](error_code err, copy_response &&resp) { @@ -358,8 +348,8 @@ void nfs_client_impl::end_copy(::dsn::error_code err, if (!fc->user_req->is_finished) { host_port hp; - if (fc->user_req->file_size_req.__isset.source_host_port) { - hp = fc->user_req->file_size_req.source_host_port; + if (fc->user_req->file_size_req.__isset.hp_source) { + hp = fc->user_req->file_size_req.hp_source; } else { hp = host_port(fc->user_req->file_size_req.source); }