diff --git a/src/yb/consensus/consensus.h b/src/yb/consensus/consensus.h index 42482711bda7..040509bd5964 100644 --- a/src/yb/consensus/consensus.h +++ b/src/yb/consensus/consensus.h @@ -41,6 +41,7 @@ #include "yb/common/entity_ids_types.h" #include "yb/consensus/consensus_fwd.h" +#include "yb/consensus/consensus_types.h" #include "yb/consensus/consensus_types.pb.h" #include "yb/consensus/metadata.pb.h" @@ -170,7 +171,7 @@ class Consensus { // Implement a LeaderStepDown() request. virtual Status StepDown(const LeaderStepDownRequestPB* req, - LeaderStepDownResponsePB* resp); + LeaderStepDownResponsePB* resp); // Wait until the node has LEADER role. // Returns Status::TimedOut if the role is not LEADER within 'timeout'. @@ -244,17 +245,19 @@ class Consensus { // Messages sent from CANDIDATEs to voting peers to request their vote // in leader election. virtual Status RequestVote(const VoteRequestPB* request, - VoteResponsePB* response) = 0; + VoteResponsePB* response) = 0; // Implement a ChangeConfig() request. virtual Status ChangeConfig(const ChangeConfigRequestPB& req, - const StdStatusCallback& client_cb, - boost::optional* error); + const StdStatusCallback& client_cb, + boost::optional* error); virtual Status UnsafeChangeConfig( const UnsafeChangeConfigRequestPB& req, boost::optional* error_code) = 0; + virtual std::vector GetFollowerCommunicationTimes() = 0; + // Returns the current Raft role of this instance. virtual PeerRole role() const = 0; diff --git a/src/yb/consensus/consensus_queue.cc b/src/yb/consensus/consensus_queue.cc index 118ff741b78c..aa3c32f2be7f 100644 --- a/src/yb/consensus/consensus_queue.cc +++ b/src/yb/consensus/consensus_queue.cc @@ -1744,5 +1744,18 @@ Result PeerMessageQueue::TEST_GetLastOpIdWithType( return log_cache_.TEST_GetLastOpIdWithType(max_allowed_index, op_type); } +std::vector PeerMessageQueue::GetFollowerCommunicationTimes() const { + std::vector result; + std::lock_guard lock(queue_lock_); + result.reserve(peers_map_.size()); + for (const auto& [peer_uuid, peer] : peers_map_) { + if (peer_uuid == local_peer_uuid_) { + continue; + } + result.emplace_back(peer_uuid, peer->last_successful_communication_time); + } + return result; +} + } // namespace consensus } // namespace yb diff --git a/src/yb/consensus/consensus_queue.h b/src/yb/consensus/consensus_queue.h index 193040ff7295..93e45cbb4726 100644 --- a/src/yb/consensus/consensus_queue.h +++ b/src/yb/consensus/consensus_queue.h @@ -45,8 +45,9 @@ #include "yb/common/placement_info.h" #include "yb/consensus/consensus_fwd.h" -#include "yb/consensus/metadata.pb.h" +#include "yb/consensus/consensus_types.h" #include "yb/consensus/log_cache.h" +#include "yb/consensus/metadata.pb.h" #include "yb/consensus/opid_util.h" #include "yb/gutil/ref_counted.h" @@ -415,6 +416,8 @@ class PeerMessageQueue { Result TEST_GetLastOpIdWithType(int64_t max_allowed_index, OperationType op_type); + std::vector GetFollowerCommunicationTimes() const; + private: FRIEND_TEST(ConsensusQueueTest, TestQueueAdvancesCommittedIndex); FRIEND_TEST(ConsensusQueueTest, TestReadReplicatedMessagesForCDC); diff --git a/src/yb/consensus/consensus_types.h b/src/yb/consensus/consensus_types.h index b9f1c8baee62..55ed2564adc6 100644 --- a/src/yb/consensus/consensus_types.h +++ b/src/yb/consensus/consensus_types.h @@ -25,5 +25,14 @@ struct ConsensusOptions { std::string tablet_id; }; +// Return value for GetFollowerCommunicationTimes. +struct FollowerCommunicationTime { + std::string peer_uuid; + MonoTime last_successful_communication; + + explicit FollowerCommunicationTime(std::string peer_uuid, MonoTime last_successful_communication) + : peer_uuid(peer_uuid), last_successful_communication(last_successful_communication) {} +}; + } // namespace consensus } // namespace yb diff --git a/src/yb/consensus/raft_consensus.cc b/src/yb/consensus/raft_consensus.cc index 80822480177b..8f79ba4644ef 100644 --- a/src/yb/consensus/raft_consensus.cc +++ b/src/yb/consensus/raft_consensus.cc @@ -2836,6 +2836,10 @@ Status RaftConsensus::UnsafeChangeConfig( return s; } +std::vector RaftConsensus::GetFollowerCommunicationTimes() { + return queue_->GetFollowerCommunicationTimes(); +} + void RaftConsensus::Shutdown() { LOG_WITH_PREFIX(INFO) << "Shutdown."; diff --git a/src/yb/consensus/raft_consensus.h b/src/yb/consensus/raft_consensus.h index f536dc55021b..290022976d1e 100644 --- a/src/yb/consensus/raft_consensus.h +++ b/src/yb/consensus/raft_consensus.h @@ -177,6 +177,8 @@ class RaftConsensus : public std::enable_shared_from_this, const UnsafeChangeConfigRequestPB& req, boost::optional* error_code) override; + std::vector GetFollowerCommunicationTimes() override; + PeerRole GetRoleUnlocked() const; PeerRole role() const override; diff --git a/src/yb/integration-tests/tablet_health_manager-itest.cc b/src/yb/integration-tests/tablet_health_manager-itest.cc index e7f0a76d39e0..290a8c516b75 100644 --- a/src/yb/integration-tests/tablet_health_manager-itest.cc +++ b/src/yb/integration-tests/tablet_health_manager-itest.cc @@ -20,8 +20,13 @@ #include "yb/integration-tests/external_mini_cluster-itest-base.h" #include "yb/integration-tests/yb_mini_cluster_test_base.h" #include "yb/integration-tests/yb_table_test_base.h" + +#include "yb/master/master_admin.pb.h" +#include "yb/master/master_admin.proxy.h" #include "yb/master/sys_catalog_constants.h" + #include "yb/tools/yb-admin_client.h" + #include "yb/util/backoff_waiter.h" #include "yb/util/monotime.h" #include "yb/util/test_macros.h" @@ -231,5 +236,50 @@ TEST_F(AreNodesSafeToTakeDownRf5Itest, TserverLagging) { {bad_tserver->uuid(), good_tserver1->uuid()}, {}, kFollowerLagBoundMs)); } +TEST_F(AreNodesSafeToTakeDownItest, GetFollowerUpdateDelay) { + auto leader_master = external_mini_cluster_->GetLeaderMaster(); + auto proxy = external_mini_cluster_->GetProxy(leader_master); + master::GetMasterHeartbeatDelaysRequestPB req; + master::GetMasterHeartbeatDelaysResponsePB resp; + rpc::RpcController rpc; + ASSERT_OK(proxy.GetMasterHeartbeatDelays(req, &resp, &rpc)); + ASSERT_EQ(resp.heartbeat_delay_size(), 2); + auto max_expected_heartbeat_time = FLAGS_raft_heartbeat_interval_ms * 2; + for (const auto& heartbeat_delay : resp.heartbeat_delay()) { + ASSERT_LE(heartbeat_delay.last_heartbeat_delta_ms(), max_expected_heartbeat_time); + } +} + +// Stop one of the nodes and ensure its delay increases. +TEST_F(AreNodesSafeToTakeDownItest, GetFollowerUpdateDelayWithStoppedNode) { + auto leader_master = external_mini_cluster_->GetLeaderMaster(); + auto proxy = external_mini_cluster_->GetProxy(leader_master); + auto masters = external_mini_cluster_->master_daemons(); + auto master_it = + std::find_if(masters.begin(), masters.end(), [leader_master](const auto master) -> bool { + return master->uuid() != leader_master->uuid(); + }); + ASSERT_NE(master_it, masters.end()); + auto other_master = *master_it; + other_master->Shutdown(); + auto sleep_time = FLAGS_raft_heartbeat_interval_ms * 3; + SleepFor(MonoDelta::FromMilliseconds(sleep_time)); + master::GetMasterHeartbeatDelaysRequestPB req; + master::GetMasterHeartbeatDelaysResponsePB resp; + rpc::RpcController rpc; + ASSERT_OK(proxy.GetMasterHeartbeatDelays(req, &resp, &rpc)); + ASSERT_EQ(resp.heartbeat_delay_size(), 2); + auto max_expected_heartbeat_time = FLAGS_raft_heartbeat_interval_ms * 2; + for (const auto& heartbeat_delay : resp.heartbeat_delay()) { + if (heartbeat_delay.master_uuid() != other_master->uuid()) { + ASSERT_LE(heartbeat_delay.last_heartbeat_delta_ms(), max_expected_heartbeat_time); + } else { + ASSERT_GE(heartbeat_delay.last_heartbeat_delta_ms(), sleep_time); + } + } + + ASSERT_OK(other_master->Restart()); +} + } // namespace integration_tests } // namespace yb diff --git a/src/yb/master/master_admin.proto b/src/yb/master/master_admin.proto index 6f2d553988ed..5a3c544d7ced 100644 --- a/src/yb/master/master_admin.proto +++ b/src/yb/master/master_admin.proto @@ -223,6 +223,18 @@ message AreNodesSafeToTakeDownResponsePB { optional MasterErrorPB error = 1; } +message GetMasterHeartbeatDelaysRequestPB { } + +message GetMasterHeartbeatDelaysResponsePB { + message MasterHeartbeatDelay { + optional bytes master_uuid = 1; + optional int64 last_heartbeat_delta_ms = 2; + } + + optional MasterErrorPB error = 1; + repeated MasterHeartbeatDelay heartbeat_delay = 2; +} + service MasterAdmin { option (yb.rpc.custom_service_name) = "yb.master.MasterService"; @@ -251,6 +263,8 @@ service MasterAdmin { rpc AreNodesSafeToTakeDown(AreNodesSafeToTakeDownRequestPB) returns (AreNodesSafeToTakeDownResponsePB); + rpc GetMasterHeartbeatDelays(GetMasterHeartbeatDelaysRequestPB) + returns (GetMasterHeartbeatDelaysResponsePB); rpc DdlLog(DdlLogRequestPB) returns (DdlLogResponsePB); diff --git a/src/yb/master/master_admin_service.cc b/src/yb/master/master_admin_service.cc index 6f4b8195ffc1..530a82b6336b 100644 --- a/src/yb/master/master_admin_service.cc +++ b/src/yb/master/master_admin_service.cc @@ -65,6 +65,7 @@ class MasterAdminServiceImpl : public MasterServiceBase, public MasterAdminIf { MASTER_SERVICE_IMPL_ON_LEADER_WITH_LOCK( TabletHealthManager, (AreNodesSafeToTakeDown) + (GetMasterHeartbeatDelays) ) MASTER_SERVICE_IMPL_ON_ALL_MASTERS( diff --git a/src/yb/master/tablet_health_manager.cc b/src/yb/master/tablet_health_manager.cc index f2cd0727d9f9..dc2ba0b98a59 100644 --- a/src/yb/master/tablet_health_manager.cc +++ b/src/yb/master/tablet_health_manager.cc @@ -28,9 +28,11 @@ #include "yb/common/entity_ids_types.h" #include "yb/common/wire_protocol.h" #include "yb/common/wire_protocol.pb.h" + #include "yb/consensus/consensus.h" #include "yb/consensus/metadata.pb.h" #include "yb/consensus/raft_consensus.h" + #include "yb/master/async_rpc_tasks.h" #include "yb/master/catalog_entity_info.h" #include "yb/master/catalog_manager-internal.h" @@ -41,9 +43,12 @@ #include "yb/master/sys_catalog_constants.h" #include "yb/master/ts_descriptor.h" #include "yb/master/ts_manager.h" + #include "yb/rpc/rpc_context.h" + #include "yb/tablet/tablet_peer.h" #include "yb/tserver/tserver.pb.h" + #include "yb/util/flags/flag_tags.h" #include "yb/util/logging.h" #include "yb/util/monotime.h" @@ -304,12 +309,11 @@ Status TabletHealthManager::AreNodesSafeToTakeDown( Status TabletHealthManager::CheckMasterTabletHealth( const CheckMasterTabletHealthRequestPB *req, CheckMasterTabletHealthResponsePB *resp) { - auto peer = catalog_manager_->tablet_peer(); - auto consensus = VERIFY_RESULT(peer->GetRaftConsensus()); - if (!consensus) { + auto consensus_result = catalog_manager_->tablet_peer()->GetRaftConsensus(); + if (!consensus_result) { return STATUS_FORMAT(IllegalState, "Could not get sys catalog tablet consensus"); } - + auto& consensus = *consensus_result; auto role = consensus->role(); resp->set_role(role); if (role != PeerRole::LEADER) { @@ -318,5 +322,23 @@ Status TabletHealthManager::CheckMasterTabletHealth( return Status::OK(); } +Status TabletHealthManager::GetMasterHeartbeatDelays( + const GetMasterHeartbeatDelaysRequestPB* req, GetMasterHeartbeatDelaysResponsePB* resp) { + auto consensus_result = catalog_manager_->tablet_peer()->GetConsensus(); + if (!consensus_result) { + return STATUS_FORMAT(IllegalState, "Could not get sys catalog tablet consensus"); + } + auto& consensus = *consensus_result; + auto now = MonoTime::Now(); + for (auto& last_communication_time : consensus->GetFollowerCommunicationTimes()) { + auto* heartbeat_delay = resp->add_heartbeat_delay(); + heartbeat_delay->set_master_uuid(std::move(last_communication_time.peer_uuid)); + heartbeat_delay->set_last_heartbeat_delta_ms( + now.GetDeltaSince(last_communication_time.last_successful_communication) + .ToMilliseconds()); + } + return Status::OK(); +} + } // namespace master } // namespace yb diff --git a/src/yb/master/tablet_health_manager.h b/src/yb/master/tablet_health_manager.h index 3d985b1ebf35..29370db94c6e 100644 --- a/src/yb/master/tablet_health_manager.h +++ b/src/yb/master/tablet_health_manager.h @@ -104,6 +104,9 @@ class TabletHealthManager { Status CheckMasterTabletHealth( const CheckMasterTabletHealthRequestPB* req, CheckMasterTabletHealthResponsePB* resp); + Status GetMasterHeartbeatDelays( + const GetMasterHeartbeatDelaysRequestPB* req, GetMasterHeartbeatDelaysResponsePB* resp); + private: Master* master_; CatalogManagerIf* catalog_manager_;