Skip to content

Commit

Permalink
[#18788] docdb: add RPC to get heartbeat update delays of master foll…
Browse files Browse the repository at this point in the history
…owers

Summary:
This diff adds a new RPC to the `MasterAdmin` to get the number of milliseconds since the master leader has successfully processed a consensus update from each of the master followers. The implementation just plumbs through the `last_successful_communication_time` field of the consensus queue up. This is the same field used by leaders to decide the health of a peer and whether to evict a peer (although masters do not evict peer masters).

I intend to do a little more cleanup work on the unit tests, but I wanted to get out a diff for review sooner.
Jira: DB-7670

Test Plan:
```
ybd --cxx-test tablet_health_manager-itest --gtest_filter '*GetFollowerUpdateDelay*'
```

Reviewers: asrivastava, rahuldesirazu

Reviewed By: asrivastava

Subscribers: ybase, bogdan, slingam

Differential Revision: https://phorge.dev.yugabyte.com/D30479
  • Loading branch information
druzac committed Dec 4, 2023
1 parent 5fbfa76 commit 2dfc818
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 9 deletions.
11 changes: 7 additions & 4 deletions src/yb/consensus/consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "yb/common/entity_ids_types.h"

#include "yb/consensus/consensus_fwd.h"
#include "yb/consensus/consensus_types.h"
#include "yb/consensus/consensus_types.pb.h"
#include "yb/consensus/metadata.pb.h"

Expand Down Expand Up @@ -170,7 +171,7 @@ class Consensus {

// Implement a LeaderStepDown() request.
virtual Status StepDown(const LeaderStepDownRequestPB* req,
LeaderStepDownResponsePB* resp);
LeaderStepDownResponsePB* resp);

// Wait until the node has LEADER role.
// Returns Status::TimedOut if the role is not LEADER within 'timeout'.
Expand Down Expand Up @@ -244,17 +245,19 @@ class Consensus {
// Messages sent from CANDIDATEs to voting peers to request their vote
// in leader election.
virtual Status RequestVote(const VoteRequestPB* request,
VoteResponsePB* response) = 0;
VoteResponsePB* response) = 0;

// Implement a ChangeConfig() request.
virtual Status ChangeConfig(const ChangeConfigRequestPB& req,
const StdStatusCallback& client_cb,
boost::optional<tserver::TabletServerErrorPB::Code>* error);
const StdStatusCallback& client_cb,
boost::optional<tserver::TabletServerErrorPB::Code>* error);

virtual Status UnsafeChangeConfig(
const UnsafeChangeConfigRequestPB& req,
boost::optional<tserver::TabletServerErrorPB::Code>* error_code) = 0;

virtual std::vector<FollowerCommunicationTime> GetFollowerCommunicationTimes() = 0;

// Returns the current Raft role of this instance.
virtual PeerRole role() const = 0;

Expand Down
13 changes: 13 additions & 0 deletions src/yb/consensus/consensus_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1744,5 +1744,18 @@ Result<OpId> PeerMessageQueue::TEST_GetLastOpIdWithType(
return log_cache_.TEST_GetLastOpIdWithType(max_allowed_index, op_type);
}

std::vector<FollowerCommunicationTime> PeerMessageQueue::GetFollowerCommunicationTimes() const {
std::vector<FollowerCommunicationTime> result;
std::lock_guard lock(queue_lock_);
result.reserve(peers_map_.size());
for (const auto& [peer_uuid, peer] : peers_map_) {
if (peer_uuid == local_peer_uuid_) {
continue;
}
result.emplace_back(peer_uuid, peer->last_successful_communication_time);
}
return result;
}

} // namespace consensus
} // namespace yb
5 changes: 4 additions & 1 deletion src/yb/consensus/consensus_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@
#include "yb/common/placement_info.h"

#include "yb/consensus/consensus_fwd.h"
#include "yb/consensus/metadata.pb.h"
#include "yb/consensus/consensus_types.h"
#include "yb/consensus/log_cache.h"
#include "yb/consensus/metadata.pb.h"
#include "yb/consensus/opid_util.h"

#include "yb/gutil/ref_counted.h"
Expand Down Expand Up @@ -415,6 +416,8 @@ class PeerMessageQueue {

Result<OpId> TEST_GetLastOpIdWithType(int64_t max_allowed_index, OperationType op_type);

std::vector<FollowerCommunicationTime> GetFollowerCommunicationTimes() const;

private:
FRIEND_TEST(ConsensusQueueTest, TestQueueAdvancesCommittedIndex);
FRIEND_TEST(ConsensusQueueTest, TestReadReplicatedMessagesForCDC);
Expand Down
9 changes: 9 additions & 0 deletions src/yb/consensus/consensus_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,14 @@ struct ConsensusOptions {
std::string tablet_id;
};

// Return value for GetFollowerCommunicationTimes.
struct FollowerCommunicationTime {
std::string peer_uuid;
MonoTime last_successful_communication;

explicit FollowerCommunicationTime(std::string peer_uuid, MonoTime last_successful_communication)
: peer_uuid(peer_uuid), last_successful_communication(last_successful_communication) {}
};

} // namespace consensus
} // namespace yb
4 changes: 4 additions & 0 deletions src/yb/consensus/raft_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2836,6 +2836,10 @@ Status RaftConsensus::UnsafeChangeConfig(
return s;
}

std::vector<FollowerCommunicationTime> RaftConsensus::GetFollowerCommunicationTimes() {
return queue_->GetFollowerCommunicationTimes();
}

void RaftConsensus::Shutdown() {
LOG_WITH_PREFIX(INFO) << "Shutdown.";

Expand Down
2 changes: 2 additions & 0 deletions src/yb/consensus/raft_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
const UnsafeChangeConfigRequestPB& req,
boost::optional<tserver::TabletServerErrorPB::Code>* error_code) override;

std::vector<FollowerCommunicationTime> GetFollowerCommunicationTimes() override;

PeerRole GetRoleUnlocked() const;

PeerRole role() const override;
Expand Down
50 changes: 50 additions & 0 deletions src/yb/integration-tests/tablet_health_manager-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@
#include "yb/integration-tests/external_mini_cluster-itest-base.h"
#include "yb/integration-tests/yb_mini_cluster_test_base.h"
#include "yb/integration-tests/yb_table_test_base.h"

#include "yb/master/master_admin.pb.h"
#include "yb/master/master_admin.proxy.h"
#include "yb/master/sys_catalog_constants.h"

#include "yb/tools/yb-admin_client.h"

#include "yb/util/backoff_waiter.h"
#include "yb/util/monotime.h"
#include "yb/util/test_macros.h"
Expand Down Expand Up @@ -231,5 +236,50 @@ TEST_F(AreNodesSafeToTakeDownRf5Itest, TserverLagging) {
{bad_tserver->uuid(), good_tserver1->uuid()}, {}, kFollowerLagBoundMs));
}

TEST_F(AreNodesSafeToTakeDownItest, GetFollowerUpdateDelay) {
auto leader_master = external_mini_cluster_->GetLeaderMaster();
auto proxy = external_mini_cluster_->GetProxy<master::MasterAdminProxy>(leader_master);
master::GetMasterHeartbeatDelaysRequestPB req;
master::GetMasterHeartbeatDelaysResponsePB resp;
rpc::RpcController rpc;
ASSERT_OK(proxy.GetMasterHeartbeatDelays(req, &resp, &rpc));
ASSERT_EQ(resp.heartbeat_delay_size(), 2);
auto max_expected_heartbeat_time = FLAGS_raft_heartbeat_interval_ms * 2;
for (const auto& heartbeat_delay : resp.heartbeat_delay()) {
ASSERT_LE(heartbeat_delay.last_heartbeat_delta_ms(), max_expected_heartbeat_time);
}
}

// Stop one of the nodes and ensure its delay increases.
TEST_F(AreNodesSafeToTakeDownItest, GetFollowerUpdateDelayWithStoppedNode) {
auto leader_master = external_mini_cluster_->GetLeaderMaster();
auto proxy = external_mini_cluster_->GetProxy<master::MasterAdminProxy>(leader_master);
auto masters = external_mini_cluster_->master_daemons();
auto master_it =
std::find_if(masters.begin(), masters.end(), [leader_master](const auto master) -> bool {
return master->uuid() != leader_master->uuid();
});
ASSERT_NE(master_it, masters.end());
auto other_master = *master_it;
other_master->Shutdown();
auto sleep_time = FLAGS_raft_heartbeat_interval_ms * 3;
SleepFor(MonoDelta::FromMilliseconds(sleep_time));
master::GetMasterHeartbeatDelaysRequestPB req;
master::GetMasterHeartbeatDelaysResponsePB resp;
rpc::RpcController rpc;
ASSERT_OK(proxy.GetMasterHeartbeatDelays(req, &resp, &rpc));
ASSERT_EQ(resp.heartbeat_delay_size(), 2);
auto max_expected_heartbeat_time = FLAGS_raft_heartbeat_interval_ms * 2;
for (const auto& heartbeat_delay : resp.heartbeat_delay()) {
if (heartbeat_delay.master_uuid() != other_master->uuid()) {
ASSERT_LE(heartbeat_delay.last_heartbeat_delta_ms(), max_expected_heartbeat_time);
} else {
ASSERT_GE(heartbeat_delay.last_heartbeat_delta_ms(), sleep_time);
}
}

ASSERT_OK(other_master->Restart());
}

} // namespace integration_tests
} // namespace yb
14 changes: 14 additions & 0 deletions src/yb/master/master_admin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,18 @@ message AreNodesSafeToTakeDownResponsePB {
optional MasterErrorPB error = 1;
}

message GetMasterHeartbeatDelaysRequestPB { }

message GetMasterHeartbeatDelaysResponsePB {
message MasterHeartbeatDelay {
optional bytes master_uuid = 1;
optional int64 last_heartbeat_delta_ms = 2;
}

optional MasterErrorPB error = 1;
repeated MasterHeartbeatDelay heartbeat_delay = 2;
}

service MasterAdmin {
option (yb.rpc.custom_service_name) = "yb.master.MasterService";

Expand Down Expand Up @@ -251,6 +263,8 @@ service MasterAdmin {

rpc AreNodesSafeToTakeDown(AreNodesSafeToTakeDownRequestPB)
returns (AreNodesSafeToTakeDownResponsePB);
rpc GetMasterHeartbeatDelays(GetMasterHeartbeatDelaysRequestPB)
returns (GetMasterHeartbeatDelaysResponsePB);

rpc DdlLog(DdlLogRequestPB) returns (DdlLogResponsePB);

Expand Down
1 change: 1 addition & 0 deletions src/yb/master/master_admin_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class MasterAdminServiceImpl : public MasterServiceBase, public MasterAdminIf {
MASTER_SERVICE_IMPL_ON_LEADER_WITH_LOCK(
TabletHealthManager,
(AreNodesSafeToTakeDown)
(GetMasterHeartbeatDelays)
)

MASTER_SERVICE_IMPL_ON_ALL_MASTERS(
Expand Down
30 changes: 26 additions & 4 deletions src/yb/master/tablet_health_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
#include "yb/common/entity_ids_types.h"
#include "yb/common/wire_protocol.h"
#include "yb/common/wire_protocol.pb.h"

#include "yb/consensus/consensus.h"
#include "yb/consensus/metadata.pb.h"
#include "yb/consensus/raft_consensus.h"

#include "yb/master/async_rpc_tasks.h"
#include "yb/master/catalog_entity_info.h"
#include "yb/master/catalog_manager-internal.h"
Expand All @@ -41,9 +43,12 @@
#include "yb/master/sys_catalog_constants.h"
#include "yb/master/ts_descriptor.h"
#include "yb/master/ts_manager.h"

#include "yb/rpc/rpc_context.h"

#include "yb/tablet/tablet_peer.h"
#include "yb/tserver/tserver.pb.h"

#include "yb/util/flags/flag_tags.h"
#include "yb/util/logging.h"
#include "yb/util/monotime.h"
Expand Down Expand Up @@ -304,12 +309,11 @@ Status TabletHealthManager::AreNodesSafeToTakeDown(

Status TabletHealthManager::CheckMasterTabletHealth(
const CheckMasterTabletHealthRequestPB *req, CheckMasterTabletHealthResponsePB *resp) {
auto peer = catalog_manager_->tablet_peer();
auto consensus = VERIFY_RESULT(peer->GetRaftConsensus());
if (!consensus) {
auto consensus_result = catalog_manager_->tablet_peer()->GetRaftConsensus();
if (!consensus_result) {
return STATUS_FORMAT(IllegalState, "Could not get sys catalog tablet consensus");
}

auto& consensus = *consensus_result;
auto role = consensus->role();
resp->set_role(role);
if (role != PeerRole::LEADER) {
Expand All @@ -318,5 +322,23 @@ Status TabletHealthManager::CheckMasterTabletHealth(
return Status::OK();
}

Status TabletHealthManager::GetMasterHeartbeatDelays(
const GetMasterHeartbeatDelaysRequestPB* req, GetMasterHeartbeatDelaysResponsePB* resp) {
auto consensus_result = catalog_manager_->tablet_peer()->GetConsensus();
if (!consensus_result) {
return STATUS_FORMAT(IllegalState, "Could not get sys catalog tablet consensus");
}
auto& consensus = *consensus_result;
auto now = MonoTime::Now();
for (auto& last_communication_time : consensus->GetFollowerCommunicationTimes()) {
auto* heartbeat_delay = resp->add_heartbeat_delay();
heartbeat_delay->set_master_uuid(std::move(last_communication_time.peer_uuid));
heartbeat_delay->set_last_heartbeat_delta_ms(
now.GetDeltaSince(last_communication_time.last_successful_communication)
.ToMilliseconds());
}
return Status::OK();
}

} // namespace master
} // namespace yb
3 changes: 3 additions & 0 deletions src/yb/master/tablet_health_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ class TabletHealthManager {
Status CheckMasterTabletHealth(
const CheckMasterTabletHealthRequestPB* req, CheckMasterTabletHealthResponsePB* resp);

Status GetMasterHeartbeatDelays(
const GetMasterHeartbeatDelaysRequestPB* req, GetMasterHeartbeatDelaysResponsePB* resp);

private:
Master* master_;
CatalogManagerIf* catalog_manager_;
Expand Down

0 comments on commit 2dfc818

Please sign in to comment.