Skip to content

Commit

Permalink
Merge #128426
Browse files Browse the repository at this point in the history
128426: raft: add no-op MsgFortifyLeader and MsgFortifyLeaderResp  r=nvanbenschoten a=arulajmani

See individual commits

----

This commit adds two new messages to raft -- MsgFortifyLeader and
MsgFortifyLeaderResp. A candidate attempts to fortify its leadership
term after winning an election. It does so by broadcasting a
MsgFortifyLeader to all followers.

Currently, the handling of MsgFortify is a no-op; requests are trivially
rejected. In subsequent patches, we'll hook into store liveness and
correctly respond.

Informs #125261

Release note: None

Co-authored-by: Arul Ajmani <[email protected]>
  • Loading branch information
craig[bot] and arulajmani committed Aug 8, 2024
2 parents e103ae1 + bbca286 commit eb5ce7b
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 55 deletions.
2 changes: 2 additions & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,8 @@
<tr><td>STORAGE</td><td>raft.rcvd.cross_zone.bytes</td><td>Number of bytes received by this store for cross zone, same region<br/> Raft messages (when region and zone tiers are configured). If region tiers<br/> are not configured, this count may include data sent between different<br/> regions. To ensure accurate monitoring of transmitted data, it is important<br/> to set up a consistent locality configuration across nodes. Note that this<br/> does not include raft snapshot received.</td><td>Bytes</td><td>COUNTER</td><td>BYTES</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>raft.rcvd.dropped</td><td>Number of incoming Raft messages dropped (due to queue length or size)</td><td>Messages</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>raft.rcvd.dropped_bytes</td><td>Bytes of dropped incoming Raft messages</td><td>Bytes</td><td>COUNTER</td><td>BYTES</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>raft.rcvd.fortifyleader</td><td>Number of MsgFortifyLeader messages received by this store</td><td>Messages</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>raft.rcvd.fortifyleaderresp</td><td>Number of MsgFortifyLeaderResp messages received by this store</td><td>Messages</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>raft.rcvd.heartbeat</td><td>Number of (coalesced, if enabled) MsgHeartbeat messages received by this store</td><td>Messages</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>raft.rcvd.heartbeatresp</td><td>Number of (coalesced, if enabled) MsgHeartbeatResp messages received by this store</td><td>Messages</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>raft.rcvd.prevote</td><td>Number of MsgPreVote messages received by this store</td><td>Messages</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
38 changes: 26 additions & 12 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1606,6 +1606,18 @@ cache will already have moved on to newer entries.
Measurement: "Messages",
Unit: metric.Unit_COUNT,
}
metaRaftRcvdFortifyLeader = metric.Metadata{
Name: "raft.rcvd.fortifyleader",
Help: "Number of MsgFortifyLeader messages received by this store",
Measurement: "Messages",
Unit: metric.Unit_COUNT,
}
metaRaftRcvdFortifyLeaderResp = metric.Metadata{
Name: "raft.rcvd.fortifyleaderresp",
Help: "Number of MsgFortifyLeaderResp messages received by this store",
Measurement: "Messages",
Unit: metric.Unit_COUNT,
}
metaRaftRcvdDropped = metric.Metadata{
Name: "raft.rcvd.dropped",
Help: "Number of incoming Raft messages dropped (due to queue length or size)",
Expand Down Expand Up @@ -3522,18 +3534,20 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {

// Raft message metrics.
RaftRcvdMessages: [maxRaftMsgType + 1]*metric.Counter{
raftpb.MsgProp: metric.NewCounter(metaRaftRcvdProp),
raftpb.MsgApp: metric.NewCounter(metaRaftRcvdApp),
raftpb.MsgAppResp: metric.NewCounter(metaRaftRcvdAppResp),
raftpb.MsgVote: metric.NewCounter(metaRaftRcvdVote),
raftpb.MsgVoteResp: metric.NewCounter(metaRaftRcvdVoteResp),
raftpb.MsgPreVote: metric.NewCounter(metaRaftRcvdPreVote),
raftpb.MsgPreVoteResp: metric.NewCounter(metaRaftRcvdPreVoteResp),
raftpb.MsgSnap: metric.NewCounter(metaRaftRcvdSnap),
raftpb.MsgHeartbeat: metric.NewCounter(metaRaftRcvdHeartbeat),
raftpb.MsgHeartbeatResp: metric.NewCounter(metaRaftRcvdHeartbeatResp),
raftpb.MsgTransferLeader: metric.NewCounter(metaRaftRcvdTransferLeader),
raftpb.MsgTimeoutNow: metric.NewCounter(metaRaftRcvdTimeoutNow),
raftpb.MsgProp: metric.NewCounter(metaRaftRcvdProp),
raftpb.MsgApp: metric.NewCounter(metaRaftRcvdApp),
raftpb.MsgAppResp: metric.NewCounter(metaRaftRcvdAppResp),
raftpb.MsgVote: metric.NewCounter(metaRaftRcvdVote),
raftpb.MsgVoteResp: metric.NewCounter(metaRaftRcvdVoteResp),
raftpb.MsgPreVote: metric.NewCounter(metaRaftRcvdPreVote),
raftpb.MsgPreVoteResp: metric.NewCounter(metaRaftRcvdPreVoteResp),
raftpb.MsgSnap: metric.NewCounter(metaRaftRcvdSnap),
raftpb.MsgHeartbeat: metric.NewCounter(metaRaftRcvdHeartbeat),
raftpb.MsgHeartbeatResp: metric.NewCounter(metaRaftRcvdHeartbeatResp),
raftpb.MsgTransferLeader: metric.NewCounter(metaRaftRcvdTransferLeader),
raftpb.MsgTimeoutNow: metric.NewCounter(metaRaftRcvdTimeoutNow),
raftpb.MsgFortifyLeader: metric.NewCounter(metaRaftRcvdFortifyLeader),
raftpb.MsgFortifyLeaderResp: metric.NewCounter(metaRaftRcvdFortifyLeaderResp),
},
RaftRcvdDropped: metric.NewCounter(metaRaftRcvdDropped),
RaftRcvdDroppedBytes: metric.NewCounter(metaRaftRcvdDroppedBytes),
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

// maxRaftMsgType is the maximum value in the raft.MessageType enum.
const maxRaftMsgType = raftpb.MsgForgetLeader
const maxRaftMsgType = raftpb.MsgFortifyLeaderResp

func init() {
for v := range raftpb.MessageType_name {
Expand Down
34 changes: 34 additions & 0 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,10 @@ type raft struct {

// the leader id
lead pb.PeerID
// leadEpoch, if set, corresponds to the StoreLiveness epoch that this peer
// has supported the leader in. It's unset if the peer hasn't supported the
// current leader.
//
// TODO(arul): This should be populated when responding to a MsgFortify.
leadEpoch pb.Epoch
// leadTransferee is id of the leader transfer target when its value is not zero.
Expand Down Expand Up @@ -690,6 +694,14 @@ func (r *raft) bcastHeartbeat() {
})
}

// bcastFortify sends an RPC to fortify the leader to all peers (including the leader itself).
func (r *raft) bcastFortify() {
assertTrue(r.state == StateLeader, "only leaders can fortify")

// TODO(arul): this needs to be hooked up to a version check. For now, treat
// it as a no-op.
}

func (r *raft) appliedTo(index uint64, size entryEncodingSize) {
oldApplied := r.raftLog.applied
newApplied := max(index, oldApplied)
Expand Down Expand Up @@ -1326,6 +1338,8 @@ func stepLeader(r *raft, m pb.Message) error {

case pb.MsgForgetLeader:
return nil // noop on leader
case pb.MsgFortifyLeaderResp:
r.handleFortifyResp(m)
}

// All other message types require a progress for m.From (pr).
Expand Down Expand Up @@ -1610,6 +1624,9 @@ func stepCandidate(r *raft, m pb.Message) error {
case pb.MsgSnap:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleSnapshot(m)
case pb.MsgFortifyLeader:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleFortify(m)
case myVoteRespType:
gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
Expand All @@ -1619,6 +1636,7 @@ func stepCandidate(r *raft, m pb.Message) error {
r.campaign(campaignElection)
} else {
r.becomeLeader()
r.bcastFortify()
r.bcastAppend()
}
case quorum.VoteLost:
Expand Down Expand Up @@ -1664,6 +1682,8 @@ func stepFollower(r *raft, m pb.Message) error {
r.electionElapsed = 0
r.lead = m.From
r.handleSnapshot(m)
case pb.MsgFortifyLeader:
r.handleFortify(m)
case pb.MsgTransferLeader:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term)
Expand Down Expand Up @@ -1829,6 +1849,20 @@ func (r *raft) handleSnapshot(m pb.Message) {
}
}

func (r *raft) handleFortify(m pb.Message) {
// TODO(arul): currently a no-op; implement.
r.send(pb.Message{
To: m.From,
Type: pb.MsgFortifyLeaderResp,
Reject: true,
})
}

func (r *raft) handleFortifyResp(m pb.Message) {
assertTrue(r.state == StateLeader, "only leaders should be handling fortification responses")
assertTrue(m.Reject, "TODO(arul): implement")
}

// restore recovers the state machine from a snapshot. It restores the log and the
// configuration of state machine. If this method returns false, the snapshot was
// ignored, either because it was obsolete or because of an error.
Expand Down
2 changes: 2 additions & 0 deletions pkg/raft/raftpb/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ enum MessageType {
MsgStorageApply = 21;
MsgStorageApplyResp = 22;
MsgForgetLeader = 23;
MsgFortifyLeader = 24;
MsgFortifyLeaderResp = 25;
// NOTE: when adding new message types, remember to update the isLocalMsg and
// isResponseMsg arrays in raft/util.go and update the corresponding tests in
// raft/util_test.go.
Expand Down
Loading

0 comments on commit eb5ce7b

Please sign in to comment.