From 7ede50cf597e7d5d8dbdb21d611e71ac41f2f210 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 2 Nov 2022 17:12:59 -0400 Subject: [PATCH] raft: move Ready constructor to rawnode.go Pure code movement. Eliminates asyncStorageWrites handling in node.go. Signed-off-by: Nathan VanBenschoten --- raft/node.go | 239 ------------------------------------------------ raft/rawnode.go | 237 ++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 236 insertions(+), 240 deletions(-) diff --git a/raft/node.go b/raft/node.go index b5b8af99102d..b08a669003ce 100644 --- a/raft/node.go +++ b/raft/node.go @@ -571,242 +571,3 @@ func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64) func (n *node) ReadIndex(ctx context.Context, rctx []byte) error { return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}}) } - -// TODO(nvanbenschoten): move this function and the functions below it to -// rawnode.go. -func newReady(r *raft, asyncStorageWrites bool, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { - rd := Ready{ - Entries: r.raftLog.nextUnstableEnts(), - CommittedEntries: r.raftLog.nextCommittedEnts(!asyncStorageWrites), - Messages: r.msgs, - } - if softSt := r.softState(); !softSt.equal(prevSoftSt) { - rd.SoftState = softSt - } - if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) { - rd.HardState = hardSt - } - if r.raftLog.hasNextUnstableSnapshot() { - rd.Snapshot = *r.raftLog.nextUnstableSnapshot() - } - if len(r.readStates) != 0 { - rd.ReadStates = r.readStates - } - rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries)) - - if asyncStorageWrites { - // If async storage writes are enabled, enqueue messages to - // local storage threads, where applicable. - if needStorageAppend(rd, len(r.msgsAfterAppend) > 0) { - m := newStorageAppendMsg(r, rd) - rd.Messages = append(rd.Messages, m) - } - if needStorageApply(rd) { - m := newStorageApplyMsg(r, rd) - rd.Messages = append(rd.Messages, m) - } - } else { - // If async storage writes are disabled, immediately enqueue - // msgsAfterAppend to be sent out. The Ready struct contract - // mandates that Messages cannot be sent until after Entries - // are written to stable storage. - for _, m := range r.msgsAfterAppend { - if m.To != r.id { - rd.Messages = append(rd.Messages, m) - } - } - } - - return rd -} - -// MustSync returns true if the hard state and count of Raft entries indicate -// that a synchronous write to persistent storage is required. -func MustSync(st, prevst pb.HardState, entsnum int) bool { - // Persistent state on all servers: - // (Updated on stable storage before responding to RPCs) - // currentTerm - // votedFor - // log entries[] - return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term -} - -func needStorageAppend(rd Ready, haveMsgsAfterAppend bool) bool { - // Return true if log entries, hard state, or a snapshot need to be written - // to stable storage. Also return true if any messages are contingent on all - // prior MsgStorageAppend being processed. - return len(rd.Entries) > 0 || - !IsEmptyHardState(rd.HardState) || - !IsEmptySnap(rd.Snapshot) || - haveMsgsAfterAppend -} - -// newStorageAppendMsg creates the message that should be sent to the local -// append thread to instruct it to append log entries, write an updated hard -// state, and apply a snapshot. The message also carries a set of responses -// that should be delivered after the rest of the message is processed. Used -// with AsyncStorageWrites. -func newStorageAppendMsg(r *raft, rd Ready) pb.Message { - m := pb.Message{ - Type: pb.MsgStorageAppend, - To: LocalAppendThread, - From: r.id, - Term: r.Term, - Entries: rd.Entries, - } - if !IsEmptyHardState(rd.HardState) { - hs := rd.HardState - m.HardState = &hs - } - if !IsEmptySnap(rd.Snapshot) { - snap := rd.Snapshot - m.Snapshot = &snap - } - // Attach all messages in msgsAfterAppend as responses to be delivered after - // the message is processed, along with a self-directed MsgStorageAppendResp - // to acknowledge the entry stability. - // - // NB: it is important for performance that MsgStorageAppendResp message be - // handled after self-directed MsgAppResp messages on the leader (which will - // be contained in msgsAfterAppend). This ordering allows the MsgAppResp - // handling to use a fast-path in r.raftLog.term() before the newly appended - // entries are removed from the unstable log. - m.Responses = r.msgsAfterAppend - m.Responses = append(m.Responses, newStorageAppendRespMsg(r, rd)) - return m -} - -// newStorageAppendRespMsg creates the message that should be returned to node -// after the unstable log entries, hard state, and snapshot in the current Ready -// (along with those in all prior Ready structs) have been saved to stable -// storage. -func newStorageAppendRespMsg(r *raft, rd Ready) pb.Message { - m := pb.Message{ - Type: pb.MsgStorageAppendResp, - To: r.id, - From: LocalAppendThread, - // Dropped after term change, see below. - Term: r.Term, - } - if r.raftLog.hasNextOrInProgressUnstableEnts() { - // If the raft log has unstable entries, attach the last index and term to the - // response message. This (index, term) tuple will be handed back and consulted - // when the stability of those log entries is signaled to the unstable. If the - // (index, term) match the unstable log by the time the response is received, - // the unstable log can be truncated. - // - // However, with just this logic, there would be an ABA problem that could lead - // to the unstable log and the stable log getting out of sync temporarily and - // leading to an inconsistent view. Consider the following example with 5 nodes, - // A B C D E: - // - // 1. A is the leader. - // 2. A proposes some log entries but only B receives these entries. - // 3. B gets the Ready and the entries are appended asynchronously. - // 4. A crashes and C becomes leader after getting a vote from D and E. - // 5. C proposes some log entries and B receives these entries, overwriting the - // previous unstable log entries that are in the process of being appended. - // The entries have a larger term than the previous entries but the same - // indexes. It begins appending these new entries asynchronously. - // 6. C crashes and A restarts and becomes leader again after getting the vote - // from D and E. - // 7. B receives the entries from A which are the same as the ones from step 2, - // overwriting the previous unstable log entries that are in the process of - // being appended from step 5. The entries have the original terms and - // indexes from step 2. Recall that log entries retain their original term - // numbers when a leader replicates entries from previous terms. It begins - // appending these new entries asynchronously. - // 8. The asynchronous log appends from the first Ready complete and stableTo - // is called. - // 9. However, the log entries from the second Ready are still in the - // asynchronous append pipeline and will overwrite (in stable storage) the - // entries from the first Ready at some future point. We can't truncate the - // unstable log yet or a future read from Storage might see the entries from - // step 5 before they have been replaced by the entries from step 7. - // Instead, we must wait until we are sure that the entries are stable and - // that no in-progress appends might overwrite them before removing entries - // from the unstable log. - // - // To prevent these kinds of problems, we also attach the current term to the - // MsgStorageAppendResp (above). If the term has changed by the time the - // MsgStorageAppendResp if returned, the response is ignored and the unstable - // log is not truncated. The unstable log is only truncated when the term has - // remained unchanged from the time that the MsgStorageAppend was sent to the - // time that the MsgStorageAppendResp is received, indicating that no-one else - // is in the process of truncating the stable log. - // - // However, this replaces a correctness problem with a liveness problem. If we - // only attempted to truncate the unstable log when appending new entries but - // also occasionally dropped these responses, then quiescence of new log entries - // could lead to the unstable log never being truncated. - // - // To combat this, we attempt to truncate the log on all MsgStorageAppendResp - // messages where the unstable log is not empty, not just those associated with - // entry appends. This includes MsgStorageAppendResp messages associated with an - // updated HardState, which occur after a term change. - // - // In other words, we set Index and LogTerm in a block that looks like: - // - // if r.raftLog.hasNextOrInProgressUnstableEnts() { ... } - // - // not like: - // - // if len(rd.Entries) > 0 { ... } - // - // To do so, we attach r.raftLog.lastIndex() and r.raftLog.lastTerm(), not the - // (index, term) of the last entry in rd.Entries. If rd.Entries is not empty, - // these will be the same. However, if rd.Entries is empty, we still want to - // attest that this (index, term) is correct at the current term, in case the - // MsgStorageAppend that contained the last entry in the unstable slice carried - // an earlier term and was dropped. - m.Index = r.raftLog.lastIndex() - m.LogTerm = r.raftLog.lastTerm() - } - if !IsEmptySnap(rd.Snapshot) { - snap := rd.Snapshot - m.Snapshot = &snap - } - return m -} - -func needStorageApply(rd Ready) bool { - return len(rd.CommittedEntries) > 0 -} - -// newStorageApplyMsg creates the message that should be sent to the local -// apply thread to instruct it to apply committed log entries. The message -// also carries a response that should be delivered after the rest of the -// message is processed. Used with AsyncStorageWrites. -func newStorageApplyMsg(r *raft, rd Ready) pb.Message { - ents := rd.CommittedEntries - last := ents[len(ents)-1].Index - return pb.Message{ - Type: pb.MsgStorageApply, - To: LocalApplyThread, - From: r.id, - Term: 0, // committed entries don't apply under a specific term - Entries: ents, - Index: last, - Responses: []pb.Message{ - newStorageApplyRespMsg(r, ents), - }, - } -} - -// newStorageApplyRespMsg creates the message that should be returned to node -// after the committed entries in the current Ready (along with those in all -// prior Ready structs) have been applied to the local state machine. -func newStorageApplyRespMsg(r *raft, committedEnts []pb.Entry) pb.Message { - last := committedEnts[len(committedEnts)-1].Index - size := r.getUncommittedSize(committedEnts) - return pb.Message{ - Type: pb.MsgStorageApplyResp, - To: r.id, - From: LocalApplyThread, - Term: 0, // committed entries don't apply under a specific term - Index: last, - // NOTE: we abuse the LogTerm field to store the aggregate entry size so - // that we don't need to introduce a new field on Message. - LogTerm: size, - } -} diff --git a/raft/rawnode.go b/raft/rawnode.go index 5a345d6f2e2f..dd52a5c39c5a 100644 --- a/raft/rawnode.go +++ b/raft/rawnode.go @@ -136,7 +136,242 @@ func (rn *RawNode) Ready() Ready { // readyWithoutAccept returns a Ready. This is a read-only operation, i.e. there // is no obligation that the Ready must be handled. func (rn *RawNode) readyWithoutAccept() Ready { - return newReady(rn.raft, rn.asyncStorageWrites, rn.prevSoftSt, rn.prevHardSt) + r := rn.raft + + rd := Ready{ + Entries: r.raftLog.nextUnstableEnts(), + CommittedEntries: r.raftLog.nextCommittedEnts(!rn.asyncStorageWrites), + Messages: r.msgs, + } + if softSt := r.softState(); !softSt.equal(rn.prevSoftSt) { + rd.SoftState = softSt + } + if hardSt := r.hardState(); !isHardStateEqual(hardSt, rn.prevHardSt) { + rd.HardState = hardSt + } + if r.raftLog.hasNextUnstableSnapshot() { + rd.Snapshot = *r.raftLog.nextUnstableSnapshot() + } + if len(r.readStates) != 0 { + rd.ReadStates = r.readStates + } + rd.MustSync = MustSync(r.hardState(), rn.prevHardSt, len(rd.Entries)) + + if rn.asyncStorageWrites { + // If async storage writes are enabled, enqueue messages to + // local storage threads, where applicable. + if needStorageAppend(rd, len(r.msgsAfterAppend) > 0) { + m := newStorageAppendMsg(r, rd) + rd.Messages = append(rd.Messages, m) + } + if needStorageApply(rd) { + m := newStorageApplyMsg(r, rd) + rd.Messages = append(rd.Messages, m) + } + } else { + // If async storage writes are disabled, immediately enqueue + // msgsAfterAppend to be sent out. The Ready struct contract + // mandates that Messages cannot be sent until after Entries + // are written to stable storage. + for _, m := range r.msgsAfterAppend { + if m.To != r.id { + rd.Messages = append(rd.Messages, m) + } + } + } + + return rd +} + +// MustSync returns true if the hard state and count of Raft entries indicate +// that a synchronous write to persistent storage is required. +func MustSync(st, prevst pb.HardState, entsnum int) bool { + // Persistent state on all servers: + // (Updated on stable storage before responding to RPCs) + // currentTerm + // votedFor + // log entries[] + return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term +} + +func needStorageAppend(rd Ready, haveMsgsAfterAppend bool) bool { + // Return true if log entries, hard state, or a snapshot need to be written + // to stable storage. Also return true if any messages are contingent on all + // prior MsgStorageAppend being processed. + return len(rd.Entries) > 0 || + !IsEmptyHardState(rd.HardState) || + !IsEmptySnap(rd.Snapshot) || + haveMsgsAfterAppend +} + +// newStorageAppendMsg creates the message that should be sent to the local +// append thread to instruct it to append log entries, write an updated hard +// state, and apply a snapshot. The message also carries a set of responses +// that should be delivered after the rest of the message is processed. Used +// with AsyncStorageWrites. +func newStorageAppendMsg(r *raft, rd Ready) pb.Message { + m := pb.Message{ + Type: pb.MsgStorageAppend, + To: LocalAppendThread, + From: r.id, + Term: r.Term, + Entries: rd.Entries, + } + if !IsEmptyHardState(rd.HardState) { + hs := rd.HardState + m.HardState = &hs + } + if !IsEmptySnap(rd.Snapshot) { + snap := rd.Snapshot + m.Snapshot = &snap + } + // Attach all messages in msgsAfterAppend as responses to be delivered after + // the message is processed, along with a self-directed MsgStorageAppendResp + // to acknowledge the entry stability. + // + // NB: it is important for performance that MsgStorageAppendResp message be + // handled after self-directed MsgAppResp messages on the leader (which will + // be contained in msgsAfterAppend). This ordering allows the MsgAppResp + // handling to use a fast-path in r.raftLog.term() before the newly appended + // entries are removed from the unstable log. + m.Responses = r.msgsAfterAppend + m.Responses = append(m.Responses, newStorageAppendRespMsg(r, rd)) + return m +} + +// newStorageAppendRespMsg creates the message that should be returned to node +// after the unstable log entries, hard state, and snapshot in the current Ready +// (along with those in all prior Ready structs) have been saved to stable +// storage. +func newStorageAppendRespMsg(r *raft, rd Ready) pb.Message { + m := pb.Message{ + Type: pb.MsgStorageAppendResp, + To: r.id, + From: LocalAppendThread, + // Dropped after term change, see below. + Term: r.Term, + } + if r.raftLog.hasNextOrInProgressUnstableEnts() { + // If the raft log has unstable entries, attach the last index and term to the + // response message. This (index, term) tuple will be handed back and consulted + // when the stability of those log entries is signaled to the unstable. If the + // (index, term) match the unstable log by the time the response is received, + // the unstable log can be truncated. + // + // However, with just this logic, there would be an ABA problem that could lead + // to the unstable log and the stable log getting out of sync temporarily and + // leading to an inconsistent view. Consider the following example with 5 nodes, + // A B C D E: + // + // 1. A is the leader. + // 2. A proposes some log entries but only B receives these entries. + // 3. B gets the Ready and the entries are appended asynchronously. + // 4. A crashes and C becomes leader after getting a vote from D and E. + // 5. C proposes some log entries and B receives these entries, overwriting the + // previous unstable log entries that are in the process of being appended. + // The entries have a larger term than the previous entries but the same + // indexes. It begins appending these new entries asynchronously. + // 6. C crashes and A restarts and becomes leader again after getting the vote + // from D and E. + // 7. B receives the entries from A which are the same as the ones from step 2, + // overwriting the previous unstable log entries that are in the process of + // being appended from step 5. The entries have the original terms and + // indexes from step 2. Recall that log entries retain their original term + // numbers when a leader replicates entries from previous terms. It begins + // appending these new entries asynchronously. + // 8. The asynchronous log appends from the first Ready complete and stableTo + // is called. + // 9. However, the log entries from the second Ready are still in the + // asynchronous append pipeline and will overwrite (in stable storage) the + // entries from the first Ready at some future point. We can't truncate the + // unstable log yet or a future read from Storage might see the entries from + // step 5 before they have been replaced by the entries from step 7. + // Instead, we must wait until we are sure that the entries are stable and + // that no in-progress appends might overwrite them before removing entries + // from the unstable log. + // + // To prevent these kinds of problems, we also attach the current term to the + // MsgStorageAppendResp (above). If the term has changed by the time the + // MsgStorageAppendResp if returned, the response is ignored and the unstable + // log is not truncated. The unstable log is only truncated when the term has + // remained unchanged from the time that the MsgStorageAppend was sent to the + // time that the MsgStorageAppendResp is received, indicating that no-one else + // is in the process of truncating the stable log. + // + // However, this replaces a correctness problem with a liveness problem. If we + // only attempted to truncate the unstable log when appending new entries but + // also occasionally dropped these responses, then quiescence of new log entries + // could lead to the unstable log never being truncated. + // + // To combat this, we attempt to truncate the log on all MsgStorageAppendResp + // messages where the unstable log is not empty, not just those associated with + // entry appends. This includes MsgStorageAppendResp messages associated with an + // updated HardState, which occur after a term change. + // + // In other words, we set Index and LogTerm in a block that looks like: + // + // if r.raftLog.hasNextOrInProgressUnstableEnts() { ... } + // + // not like: + // + // if len(rd.Entries) > 0 { ... } + // + // To do so, we attach r.raftLog.lastIndex() and r.raftLog.lastTerm(), not the + // (index, term) of the last entry in rd.Entries. If rd.Entries is not empty, + // these will be the same. However, if rd.Entries is empty, we still want to + // attest that this (index, term) is correct at the current term, in case the + // MsgStorageAppend that contained the last entry in the unstable slice carried + // an earlier term and was dropped. + m.Index = r.raftLog.lastIndex() + m.LogTerm = r.raftLog.lastTerm() + } + if !IsEmptySnap(rd.Snapshot) { + snap := rd.Snapshot + m.Snapshot = &snap + } + return m +} + +func needStorageApply(rd Ready) bool { + return len(rd.CommittedEntries) > 0 +} + +// newStorageApplyMsg creates the message that should be sent to the local +// apply thread to instruct it to apply committed log entries. The message +// also carries a response that should be delivered after the rest of the +// message is processed. Used with AsyncStorageWrites. +func newStorageApplyMsg(r *raft, rd Ready) pb.Message { + ents := rd.CommittedEntries + last := ents[len(ents)-1].Index + return pb.Message{ + Type: pb.MsgStorageApply, + To: LocalApplyThread, + From: r.id, + Term: 0, // committed entries don't apply under a specific term + Entries: ents, + Index: last, + Responses: []pb.Message{ + newStorageApplyRespMsg(r, ents), + }, + } +} + +// newStorageApplyRespMsg creates the message that should be returned to node +// after the committed entries in the current Ready (along with those in all +// prior Ready structs) have been applied to the local state machine. +func newStorageApplyRespMsg(r *raft, committedEnts []pb.Entry) pb.Message { + last := committedEnts[len(committedEnts)-1].Index + size := r.getUncommittedSize(committedEnts) + return pb.Message{ + Type: pb.MsgStorageApplyResp, + To: r.id, + From: LocalApplyThread, + Term: 0, // committed entries don't apply under a specific term + Index: last, + // NOTE: we abuse the LogTerm field to store the aggregate entry size so + // that we don't need to introduce a new field on Message. + LogTerm: size, + } } // acceptReady is called when the consumer of the RawNode has decided to go