Skip to content

Commit

Permalink
raft: move Ready constructor to rawnode.go
Browse files Browse the repository at this point in the history
Pure code movement.

Eliminates asyncStorageWrites handling in node.go.

Signed-off-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
nvanbenschoten committed Nov 2, 2022
1 parent 865df2a commit fe4eb46
Show file tree
Hide file tree
Showing 2 changed files with 236 additions and 240 deletions.
239 changes: 0 additions & 239 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,242 +571,3 @@ func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64)
func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {
return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
}

// TODO(nvanbenschoten): move this function and the functions below it to
// rawnode.go.
func newReady(r *raft, asyncStorageWrites bool, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
rd := Ready{
Entries: r.raftLog.nextUnstableEnts(),
CommittedEntries: r.raftLog.nextCommittedEnts(!asyncStorageWrites),
Messages: r.msgs,
}
if softSt := r.softState(); !softSt.equal(prevSoftSt) {
rd.SoftState = softSt
}
if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
rd.HardState = hardSt
}
if r.raftLog.hasNextUnstableSnapshot() {
rd.Snapshot = *r.raftLog.nextUnstableSnapshot()
}
if len(r.readStates) != 0 {
rd.ReadStates = r.readStates
}
rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries))

if asyncStorageWrites {
// If async storage writes are enabled, enqueue messages to
// local storage threads, where applicable.
if needStorageAppend(rd, len(r.msgsAfterAppend) > 0) {
m := newStorageAppendMsg(r, rd)
rd.Messages = append(rd.Messages, m)
}
if needStorageApply(rd) {
m := newStorageApplyMsg(r, rd)
rd.Messages = append(rd.Messages, m)
}
} else {
// If async storage writes are disabled, immediately enqueue
// msgsAfterAppend to be sent out. The Ready struct contract
// mandates that Messages cannot be sent until after Entries
// are written to stable storage.
for _, m := range r.msgsAfterAppend {
if m.To != r.id {
rd.Messages = append(rd.Messages, m)
}
}
}

return rd
}

// MustSync returns true if the hard state and count of Raft entries indicate
// that a synchronous write to persistent storage is required.
func MustSync(st, prevst pb.HardState, entsnum int) bool {
// Persistent state on all servers:
// (Updated on stable storage before responding to RPCs)
// currentTerm
// votedFor
// log entries[]
return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term
}

func needStorageAppend(rd Ready, haveMsgsAfterAppend bool) bool {
// Return true if log entries, hard state, or a snapshot need to be written
// to stable storage. Also return true if any messages are contingent on all
// prior MsgStorageAppend being processed.
return len(rd.Entries) > 0 ||
!IsEmptyHardState(rd.HardState) ||
!IsEmptySnap(rd.Snapshot) ||
haveMsgsAfterAppend
}

// newStorageAppendMsg creates the message that should be sent to the local
// append thread to instruct it to append log entries, write an updated hard
// state, and apply a snapshot. The message also carries a set of responses
// that should be delivered after the rest of the message is processed. Used
// with AsyncStorageWrites.
func newStorageAppendMsg(r *raft, rd Ready) pb.Message {
m := pb.Message{
Type: pb.MsgStorageAppend,
To: LocalAppendThread,
From: r.id,
Term: r.Term,
Entries: rd.Entries,
}
if !IsEmptyHardState(rd.HardState) {
hs := rd.HardState
m.HardState = &hs
}
if !IsEmptySnap(rd.Snapshot) {
snap := rd.Snapshot
m.Snapshot = &snap
}
// Attach all messages in msgsAfterAppend as responses to be delivered after
// the message is processed, along with a self-directed MsgStorageAppendResp
// to acknowledge the entry stability.
//
// NB: it is important for performance that MsgStorageAppendResp message be
// handled after self-directed MsgAppResp messages on the leader (which will
// be contained in msgsAfterAppend). This ordering allows the MsgAppResp
// handling to use a fast-path in r.raftLog.term() before the newly appended
// entries are removed from the unstable log.
m.Responses = r.msgsAfterAppend
m.Responses = append(m.Responses, newStorageAppendRespMsg(r, rd))
return m
}

// newStorageAppendRespMsg creates the message that should be returned to node
// after the unstable log entries, hard state, and snapshot in the current Ready
// (along with those in all prior Ready structs) have been saved to stable
// storage.
func newStorageAppendRespMsg(r *raft, rd Ready) pb.Message {
m := pb.Message{
Type: pb.MsgStorageAppendResp,
To: r.id,
From: LocalAppendThread,
// Dropped after term change, see below.
Term: r.Term,
}
if r.raftLog.hasNextOrInProgressUnstableEnts() {
// If the raft log has unstable entries, attach the last index and term to the
// response message. This (index, term) tuple will be handed back and consulted
// when the stability of those log entries is signaled to the unstable. If the
// (index, term) match the unstable log by the time the response is received,
// the unstable log can be truncated.
//
// However, with just this logic, there would be an ABA problem that could lead
// to the unstable log and the stable log getting out of sync temporarily and
// leading to an inconsistent view. Consider the following example with 5 nodes,
// A B C D E:
//
// 1. A is the leader.
// 2. A proposes some log entries but only B receives these entries.
// 3. B gets the Ready and the entries are appended asynchronously.
// 4. A crashes and C becomes leader after getting a vote from D and E.
// 5. C proposes some log entries and B receives these entries, overwriting the
// previous unstable log entries that are in the process of being appended.
// The entries have a larger term than the previous entries but the same
// indexes. It begins appending these new entries asynchronously.
// 6. C crashes and A restarts and becomes leader again after getting the vote
// from D and E.
// 7. B receives the entries from A which are the same as the ones from step 2,
// overwriting the previous unstable log entries that are in the process of
// being appended from step 5. The entries have the original terms and
// indexes from step 2. Recall that log entries retain their original term
// numbers when a leader replicates entries from previous terms. It begins
// appending these new entries asynchronously.
// 8. The asynchronous log appends from the first Ready complete and stableTo
// is called.
// 9. However, the log entries from the second Ready are still in the
// asynchronous append pipeline and will overwrite (in stable storage) the
// entries from the first Ready at some future point. We can't truncate the
// unstable log yet or a future read from Storage might see the entries from
// step 5 before they have been replaced by the entries from step 7.
// Instead, we must wait until we are sure that the entries are stable and
// that no in-progress appends might overwrite them before removing entries
// from the unstable log.
//
// To prevent these kinds of problems, we also attach the current term to the
// MsgStorageAppendResp (above). If the term has changed by the time the
// MsgStorageAppendResp if returned, the response is ignored and the unstable
// log is not truncated. The unstable log is only truncated when the term has
// remained unchanged from the time that the MsgStorageAppend was sent to the
// time that the MsgStorageAppendResp is received, indicating that no-one else
// is in the process of truncating the stable log.
//
// However, this replaces a correctness problem with a liveness problem. If we
// only attempted to truncate the unstable log when appending new entries but
// also occasionally dropped these responses, then quiescence of new log entries
// could lead to the unstable log never being truncated.
//
// To combat this, we attempt to truncate the log on all MsgStorageAppendResp
// messages where the unstable log is not empty, not just those associated with
// entry appends. This includes MsgStorageAppendResp messages associated with an
// updated HardState, which occur after a term change.
//
// In other words, we set Index and LogTerm in a block that looks like:
//
// if r.raftLog.hasNextOrInProgressUnstableEnts() { ... }
//
// not like:
//
// if len(rd.Entries) > 0 { ... }
//
// To do so, we attach r.raftLog.lastIndex() and r.raftLog.lastTerm(), not the
// (index, term) of the last entry in rd.Entries. If rd.Entries is not empty,
// these will be the same. However, if rd.Entries is empty, we still want to
// attest that this (index, term) is correct at the current term, in case the
// MsgStorageAppend that contained the last entry in the unstable slice carried
// an earlier term and was dropped.
m.Index = r.raftLog.lastIndex()
m.LogTerm = r.raftLog.lastTerm()
}
if !IsEmptySnap(rd.Snapshot) {
snap := rd.Snapshot
m.Snapshot = &snap
}
return m
}

func needStorageApply(rd Ready) bool {
return len(rd.CommittedEntries) > 0
}

// newStorageApplyMsg creates the message that should be sent to the local
// apply thread to instruct it to apply committed log entries. The message
// also carries a response that should be delivered after the rest of the
// message is processed. Used with AsyncStorageWrites.
func newStorageApplyMsg(r *raft, rd Ready) pb.Message {
ents := rd.CommittedEntries
last := ents[len(ents)-1].Index
return pb.Message{
Type: pb.MsgStorageApply,
To: LocalApplyThread,
From: r.id,
Term: 0, // committed entries don't apply under a specific term
Entries: ents,
Index: last,
Responses: []pb.Message{
newStorageApplyRespMsg(r, ents),
},
}
}

// newStorageApplyRespMsg creates the message that should be returned to node
// after the committed entries in the current Ready (along with those in all
// prior Ready structs) have been applied to the local state machine.
func newStorageApplyRespMsg(r *raft, committedEnts []pb.Entry) pb.Message {
last := committedEnts[len(committedEnts)-1].Index
size := r.getUncommittedSize(committedEnts)
return pb.Message{
Type: pb.MsgStorageApplyResp,
To: r.id,
From: LocalApplyThread,
Term: 0, // committed entries don't apply under a specific term
Index: last,
// NOTE: we abuse the LogTerm field to store the aggregate entry size so
// that we don't need to introduce a new field on Message.
LogTerm: size,
}
}
Loading

0 comments on commit fe4eb46

Please sign in to comment.