Skip to content

Commit

Permalink
raft: add data-driven test for subtle async storage write aba problem
Browse files Browse the repository at this point in the history
This commit adds a new data-driven test the reproduces a scenario
similar to the one described in newStorageAppendRespMsg, exercising a
few interesting interactions between asynchronous storage writes, term
changes, and log truncation.

Signed-off-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
nvanbenschoten committed Oct 27, 2022
1 parent ea3fa37 commit d953f70
Show file tree
Hide file tree
Showing 6 changed files with 510 additions and 5 deletions.
4 changes: 4 additions & 0 deletions raft/log_unstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,21 @@ func (u *unstable) stableTo(i, t uint64) {
gt, ok := u.maybeTerm(i)
if !ok {
// Unstable entry missing. Ignore.
u.logger.Infof("entry at index %d missing from unstable log; ignoring", i)
return
}
if i < u.offset {
// Index matched unstable snapshot, not unstable entry. Ignore.
u.logger.Infof("entry at index %d matched unstable snapshot; ignoring", i)
return
}
if gt != t {
// Term mismatch between unstable entry and specified entry. Ignore.
// This is possible if part or all of the unstable log was replaced
// between that time that a set of entries started to be written to
// stable storage and when they finished.
u.logger.Infof("entry at (index,term)=(%d,%d) mismatched with "+
"entry at (%d,%d) in unstable log; ignoring", i, t, i, gt)
return
}
num := int(i + 1 - u.offset)
Expand Down
1 change: 0 additions & 1 deletion raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,6 @@ func newStorageAppendRespMsg(r *raft, rd Ready) pb.Message {
// attest that this (index, term) is correct at the current term, in case the
// MsgStorageAppend that contained the last entry in the unstable slice carried
// an earlier term and was dropped.
// TODO(nvanbenschoten): test this behavior in a data-driven test.
m.Index = r.raftLog.lastIndex()
m.LogTerm = r.raftLog.lastTerm()
}
Expand Down
2 changes: 1 addition & 1 deletion raft/rafttest/interaction_env_handler_deliver_msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (env *InteractionEnv) DeliverMsgs(rs ...Recipient) int {
var n int
for _, r := range rs {
var msgs []raftpb.Message
msgs, env.Messages = splitMsgs(env.Messages, r.ID)
msgs, env.Messages = splitMsgs(env.Messages, r.ID, r.Drop)
n += len(msgs)
for _, msg := range msgs {
if r.Drop {
Expand Down
12 changes: 9 additions & 3 deletions raft/rafttest/interaction_env_handler_stabilize.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"

"github.com/cockroachdb/datadriven"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
)

Expand Down Expand Up @@ -59,7 +60,7 @@ func (env *InteractionEnv) Stabilize(idxs ...int) error {
id := rn.Status().ID
// NB: we grab the messages just to see whether to print the header.
// DeliverMsgs will do it again.
if msgs, _ := splitMsgs(env.Messages, id); len(msgs) > 0 {
if msgs, _ := splitMsgs(env.Messages, id, false /* drop */); len(msgs) > 0 {
fmt.Fprintf(env.Output, "> %d receiving messages\n", id)
env.withIndent(func() { env.DeliverMsgs(Recipient{ID: id}) })
done = false
Expand Down Expand Up @@ -95,14 +96,19 @@ func (env *InteractionEnv) Stabilize(idxs ...int) error {
}
}

func splitMsgs(msgs []raftpb.Message, to uint64) (toMsgs []raftpb.Message, rmdr []raftpb.Message) {
func splitMsgs(msgs []raftpb.Message, to uint64, drop bool) (toMsgs []raftpb.Message, rmdr []raftpb.Message) {
// NB: this method does not reorder messages.
for _, msg := range msgs {
if msg.To == to {
if msg.To == to && !(drop && isLocalMsg(msg)) {
toMsgs = append(toMsgs, msg)
} else {
rmdr = append(rmdr, msg)
}
}
return toMsgs, rmdr
}

// Don't drop local messages, which require reliable delivery.
func isLocalMsg(msg raftpb.Message) bool {
return msg.From == msg.To || raft.IsLocalMsgTarget(msg.From) || raft.IsLocalMsgTarget(msg.To)
}
3 changes: 3 additions & 0 deletions raft/testdata/async_storage_writes.txt
Original file line number Diff line number Diff line change
Expand Up @@ -704,8 +704,10 @@ deliver-msgs 1 2 3
3->1 MsgAppResp Term:1 Log:0/15
ApplyThread->1 MsgStorageApplyResp Term:0 Log:6/14
AppendThread->2 MsgStorageAppendResp Term:1 Log:1/15
INFO entry at index 15 missing from unstable log; ignoring
ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/14
AppendThread->3 MsgStorageAppendResp Term:1 Log:1/15
INFO entry at index 15 missing from unstable log; ignoring
ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/14

process-ready 1 2 3
Expand Down Expand Up @@ -782,4 +784,5 @@ stabilize
AppendThread->1 MsgStorageAppendResp Term:1 Log:0/0
> 1 receiving messages
AppendThread->1 MsgStorageAppendResp Term:1 Log:1/15
INFO entry at index 15 missing from unstable log; ignoring
AppendThread->1 MsgStorageAppendResp Term:1 Log:0/0
Loading

0 comments on commit d953f70

Please sign in to comment.