Skip to content

Commit

Permalink
NRG (2.11): Start catchup from n.commit & fix AppendEntry is stored…
Browse files Browse the repository at this point in the history
… at `seq=ae.pindex+1` (#5987)

This PR makes three complementary fixes to the way how catchup and
truncating is handled.
Specifically:
- when doing `n.loadEntry(index)` we need to pass where the AppendEntry
is in terms of stream sequence, this is equal to `ae.pindex+1` since the
`ae.pindex` is the value before it's stored in the stream.
- start catchup from `n.commit`, we could have messages past our commit
that have been invalidated and need to be truncated since there was a
switch between leaders
- because we catchup from `n.commit`, we check if our local AppendEntry
matches terms with the incoming AppendEntry, we only need to truncate if
the terms don't match

Signed-off-by: Maurice van Veen <[email protected]>

---------

Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
MauriceVanVeen authored Oct 15, 2024
1 parent 6c0131f commit ac5ba12
Show file tree
Hide file tree
Showing 3 changed files with 409 additions and 76 deletions.
162 changes: 162 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4274,3 +4274,165 @@ func TestJetStreamClusterHardKillAfterStreamAdd(t *testing.T) {
_, err = js.StreamInfo("TEST")
require_NoError(t, err)
}

func TestJetStreamClusterDesyncAfterPublishToLeaderWithoutQuorum(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

si, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

streamLeader := si.Cluster.Leader
streamLeaderServer := c.serverByName(streamLeader)
nc.Close()
nc, js = jsClientConnect(t, streamLeaderServer)
defer nc.Close()

servers := slices.DeleteFunc([]string{"S-1", "S-2", "S-3"}, func(s string) bool {
return s == streamLeader
})

// Stop followers so further publishes will not have quorum.
followerName1 := servers[0]
followerName2 := servers[1]
followerServer1 := c.serverByName(followerName1)
followerServer2 := c.serverByName(followerName2)
followerServer1.Shutdown()
followerServer2.Shutdown()
followerServer1.WaitForShutdown()
followerServer2.WaitForShutdown()

// Although this request will time out, it will be added to the stream leader's WAL.
_, err = js.Publish("foo", []byte("first"), nats.AckWait(time.Second))
require_NotNil(t, err)
require_Equal(t, err, nats.ErrTimeout)

// Now shut down the leader as well.
nc.Close()
streamLeaderServer.Shutdown()
streamLeaderServer.WaitForShutdown()

// Only restart the (previous) followers.
followerServer1 = c.restartServer(followerServer1)
c.restartServer(followerServer2)
c.waitOnStreamLeader(globalAccountName, "TEST")

nc, js = jsClientConnect(t, followerServer1)
defer nc.Close()

// Publishing a message will now have quorum.
pubAck, err := js.Publish("foo", []byte("first, this is a retry"))
require_NoError(t, err)
require_Equal(t, pubAck.Sequence, 1)

// Bring up the previous stream leader.
c.restartServer(streamLeaderServer)
c.waitOnAllCurrent()
c.waitOnStreamLeader(globalAccountName, "TEST")

// Check all servers ended up with the last published message, which had quorum.
for _, s := range c.servers {
c.waitOnStreamCurrent(s, globalAccountName, "TEST")

acc, err := s.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
state := mset.state()
require_Equal(t, state.Msgs, 1)
require_Equal(t, state.Bytes, 55)
}
}

func TestJetStreamClusterPreserveWALDuringCatchupWithMatchingTerm(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.>"},
Replicas: 3,
})
nc.Close()
require_NoError(t, err)

// Pick one server that will only store a part of the messages in its WAL.
rs := c.randomNonStreamLeader(globalAccountName, "TEST")
ts := time.Now().UnixNano()

// Manually add 3 append entries to each node's WAL, except for one node who is one behind.
var scratch [1024]byte
for _, s := range c.servers {
for _, n := range s.raftNodes {
rn := n.(*raft)
if rn.accName == globalAccountName {
for i := uint64(0); i < 3; i++ {
// One server will be one behind and need to catchup.
if s.Name() == rs.Name() && i >= 2 {
break
}

esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, i, ts, true)
entries := []*Entry{newEntry(EntryNormal, esm)}
rn.Lock()
ae := rn.buildAppendEntry(entries)
ae.buf, err = ae.encode(scratch[:])
require_NoError(t, err)
err = rn.storeToWAL(ae)
rn.Unlock()
require_NoError(t, err)
}
}
}
}

// Restart all.
c.stopAll()
c.restartAll()
c.waitOnAllCurrent()
c.waitOnStreamLeader(globalAccountName, "TEST")

rs = c.serverByName(rs.Name())

// Check all servers ended up with all published messages, which had quorum.
for _, s := range c.servers {
c.waitOnStreamCurrent(s, globalAccountName, "TEST")

acc, err := s.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
state := mset.state()
require_Equal(t, state.Msgs, 3)
require_Equal(t, state.Bytes, 99)
}

// Check that the first two published messages came from our WAL, and
// the last came from a catchup by another leader.
for _, n := range rs.raftNodes {
rn := n.(*raft)
if rn.accName == globalAccountName {
ae, err := rn.loadEntry(2)
require_NoError(t, err)
require_True(t, ae.leader == rn.ID())

ae, err = rn.loadEntry(3)
require_NoError(t, err)
require_True(t, ae.leader == rn.ID())

ae, err = rn.loadEntry(4)
require_NoError(t, err)
require_True(t, ae.leader != rn.ID())
}
}
}
48 changes: 32 additions & 16 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,8 @@ func (s *Server) bootstrapRaftNode(cfg *RaftConfig, knownPeers []string, allPeer
return writePeerState(cfg.Store, &peerState{knownPeers, expected, extUndetermined})
}

// startRaftNode will start the raft node.
func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabels) (RaftNode, error) {
// initRaftNode will initialize the raft node, to be used by startRaftNode or when testing to not run the Go routine.
func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabels) (*raft, error) {
if cfg == nil {
return nil, errNilCfg
}
Expand Down Expand Up @@ -520,6 +520,16 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
labels["group"] = n.group
s.registerRaftNode(n.group, n)

return n, nil
}

// startRaftNode will start the raft node.
func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabels) (RaftNode, error) {
n, err := s.initRaftNode(accName, cfg, labels)
if err != nil {
return nil, err
}

// Start the run goroutine for the Raft state machine.
s.startGoRoutine(n.run, labels)

Expand Down Expand Up @@ -3141,10 +3151,10 @@ func (n *raft) catchupStalled() bool {
if n.catchup == nil {
return false
}
if n.catchup.pindex == n.pindex {
if n.catchup.pindex == n.commit {
return time.Since(n.catchup.active) > 2*time.Second
}
n.catchup.pindex = n.pindex
n.catchup.pindex = n.commit
n.catchup.active = time.Now()
return false
}
Expand All @@ -3163,7 +3173,7 @@ func (n *raft) createCatchup(ae *appendEntry) string {
cterm: ae.pterm,
cindex: ae.pindex,
pterm: n.pterm,
pindex: n.pindex,
pindex: n.commit,
active: time.Now(),
}
inbox := n.newCatchupInbox()
Expand Down Expand Up @@ -3333,7 +3343,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
if n.catchupStalled() {
n.debug("Catchup may be stalled, will request again")
inbox = n.createCatchup(ae)
ar = newAppendEntryResponse(n.pterm, n.pindex, n.id, false)
ar = newAppendEntryResponse(n.pterm, n.commit, n.id, false)
}
n.Unlock()
if ar != nil {
Expand Down Expand Up @@ -3374,28 +3384,34 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
}

if (isNew && ae.pterm != n.pterm) || ae.pindex != n.pindex {
// Check if this is a lower or equal index than what we were expecting.
if ae.pindex <= n.pindex {
// Check if this is a lower index than what we were expecting.
if ae.pindex < n.pindex {
n.debug("AppendEntry detected pindex less than ours: %d:%d vs %d:%d", ae.pterm, ae.pindex, n.pterm, n.pindex)
var ar *appendEntryResponse

// An AppendEntry is stored at seq=ae.pindex+1. This can be checked when eae != nil, eae.pindex==ae.pindex.
seq := ae.pindex + 1
var success bool
if eae, _ := n.loadEntry(ae.pindex); eae == nil {
if eae, _ := n.loadEntry(seq); eae == nil {
// If terms are equal, and we are not catching up, we have simply already processed this message.
// So we will ACK back to the leader. This can happen on server restarts based on timings of snapshots.
if ae.pterm == n.pterm && !catchingUp {
success = true
} else {
n.resetWAL()
}
} else {
// If terms mismatched, or we got an error loading, delete that entry and all others past it.
} else if eae.term != ae.term {
// If terms mismatched, delete that entry and all others past it.
// Make sure to cancel any catchups in progress.
// Truncate will reset our pterm and pindex. Only do so if we have an entry.
n.truncateWAL(eae.pterm, eae.pindex)
} else {
success = true
}
// Cancel regardless if truncated/unsuccessful.
if !success {
n.cancelCatchup()
}
// Cancel regardless.
n.cancelCatchup()

// Create response.
ar = newAppendEntryResponse(ae.pterm, ae.pindex, n.id, success)
Expand Down Expand Up @@ -3469,11 +3485,11 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
return

} else {
n.debug("AppendEntry did not match %d %d with %d %d", ae.pterm, ae.pindex, n.pterm, n.pindex)
if ae.pindex > n.pindex {
n.debug("AppendEntry did not match %d %d with %d %d (commit %d)", ae.pterm, ae.pindex, n.pterm, n.pindex, n.commit)
if ae.pindex > n.commit {
// Setup our state for catching up.
inbox := n.createCatchup(ae)
ar := newAppendEntryResponse(n.pterm, n.pindex, n.id, false)
ar := newAppendEntryResponse(n.pterm, n.commit, n.id, false)
n.Unlock()
n.sendRPC(ae.reply, inbox, ar.encode(arbuf))
arPool.Put(ar)
Expand Down
Loading

0 comments on commit ac5ba12

Please sign in to comment.