Skip to content

Commit

Permalink
NRG (2.11): Start catchup from n.commit & fix AppendEntry is stored…
Browse files Browse the repository at this point in the history
… at `seq=ae.pindex+1` (#5987)

This PR makes three complementary fixes to the way how catchup and
truncating is handled.
Specifically:
- when doing `n.loadEntry(index)` we need to pass where the AppendEntry
is in terms of stream sequence, this is equal to `ae.pindex+1` since the
`ae.pindex` is the value before it's stored in the stream.
- start catchup from `n.commit`, we could have messages past our commit
that have been invalidated and need to be truncated since there was a
switch between leaders
- because we catchup from `n.commit`, we check if our local AppendEntry
matches terms with the incoming AppendEntry, we only need to truncate if
the terms don't match

Signed-off-by: Maurice van Veen <[email protected]>

---------

Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
MauriceVanVeen authored and neilalexander committed Nov 25, 2024
1 parent f55f34e commit 3025778
Show file tree
Hide file tree
Showing 3 changed files with 349 additions and 182 deletions.
222 changes: 114 additions & 108 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package server

import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -3729,9 +3728,7 @@ func TestJetStreamClusterDesyncAfterErrorDuringCatchup(t *testing.T) {
for _, n := range server.raftNodes {
rn := n.(*raft)
if rn.accName == "$G" {
rn.Lock()
rn.updateLeader(noLeader)
rn.Unlock()
}
}

Expand Down Expand Up @@ -3996,129 +3993,138 @@ func TestJetStreamClusterMetaSnapshotMustNotIncludePendingConsumers(t *testing.T
}
}

func TestJetStreamClusterConsumerDontSendSnapshotOnLeaderChange(t *testing.T) {
func TestJetStreamClusterDesyncAfterPublishToLeaderWithoutQuorum(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
si, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "CONSUMER",
Replicas: 3,
AckPolicy: nats.AckExplicitPolicy,
streamLeader := si.Cluster.Leader
streamLeaderServer := c.serverByName(streamLeader)
nc.Close()
nc, js = jsClientConnect(t, streamLeaderServer)
defer nc.Close()
servers := slices.DeleteFunc([]string{"S-1", "S-2", "S-3"}, func(s string) bool {
return s == streamLeader
})
require_NoError(t, err)

// Add a message and let the consumer ack it, this moves the consumer's RAFT applied up to 1.
_, err = js.Publish("foo", nil)
require_NoError(t, err)
sub, err := js.PullSubscribe("foo", "CONSUMER")
require_NoError(t, err)
msgs, err := sub.Fetch(1)
require_NoError(t, err)
require_Len(t, len(msgs), 1)
err = msgs[0].AckSync()
require_NoError(t, err)

// We don't need the client anymore.
// Stop followers so further publishes will not have quorum.
followerName1 := servers[0]
followerName2 := servers[1]
followerServer1 := c.serverByName(followerName1)
followerServer2 := c.serverByName(followerName2)
followerServer1.Shutdown()
followerServer2.Shutdown()
followerServer1.WaitForShutdown()
followerServer2.WaitForShutdown()
// Although this request will time out, it will be added to the stream leader's WAL.
_, err = js.Publish("foo", []byte("first"), nats.AckWait(time.Second))
require_NotNil(t, err)
require_Equal(t, err, nats.ErrTimeout)
// Now shut down the leader as well.
nc.Close()

lookupConsumer := func(s *Server) *consumer {
t.Helper()
mset, err := s.lookupAccount(globalAccountName)
streamLeaderServer.Shutdown()
streamLeaderServer.WaitForShutdown()
// Only restart the (previous) followers.
followerServer1 = c.restartServer(followerServer1)
c.restartServer(followerServer2)
c.waitOnStreamLeader(globalAccountName, "TEST")
nc, js = jsClientConnect(t, followerServer1)
defer nc.Close()
// Publishing a message will now have quorum.
pubAck, err := js.Publish("foo", []byte("first, this is a retry"))
require_NoError(t, err)
require_Equal(t, pubAck.Sequence, 1)
// Bring up the previous stream leader.
c.restartServer(streamLeaderServer)
c.waitOnAllCurrent()
c.waitOnStreamLeader(globalAccountName, "TEST")
// Check all servers ended up with the last published message, which had quorum.
for _, s := range c.servers {
c.waitOnStreamCurrent(s, globalAccountName, "TEST")
acc, err := s.lookupAccount(globalAccountName)
require_NoError(t, err)
acc, err := mset.lookupStream("TEST")
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
o := acc.lookupConsumer("CONSUMER")
require_NotNil(t, o)
return o
state := mset.state()
require_Equal(t, state.Msgs, 1)
require_Equal(t, state.Bytes, 55)
}

// Grab current consumer leader before moving all into observer mode.
cl := c.consumerLeader(globalAccountName, "TEST", "CONSUMER")
}
func TestJetStreamClusterPreserveWALDuringCatchupWithMatchingTerm(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.>"},
Replicas: 3,
})
nc.Close()
require_NoError(t, err)
// Pick one server that will only store a part of the messages in its WAL.
rs := c.randomNonStreamLeader(globalAccountName, "TEST")
ts := time.Now().UnixNano()
// Manually add 3 append entries to each node's WAL, except for one node who is one behind.
var scratch [1024]byte
for _, s := range c.servers {
// Put all consumer's RAFT into observer mode, this will prevent all servers from trying to become leader.
o := lookupConsumer(s)
o.node.SetObserver(true)
if s != cl {
// For all followers, pause apply so they only store messages in WAL but not apply and possibly snapshot.
err = o.node.PauseApply()
require_NoError(t, err)
for _, n := range s.raftNodes {
rn := n.(*raft)
if rn.accName == globalAccountName {
for i := uint64(0); i < 3; i++ {
// One server will be one behind and need to catchup.
if s.Name() == rs.Name() && i >= 2 {
break
}
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, i, ts, true)
entries := []*Entry{newEntry(EntryNormal, esm)}
rn.Lock()
ae := rn.buildAppendEntry(entries)
ae.buf, err = ae.encode(scratch[:])
require_NoError(t, err)
err = rn.storeToWAL(ae)
rn.Unlock()
require_NoError(t, err)
}
}
}
}

updateDeliveredBuffer := func() []byte {
var b [4*binary.MaxVarintLen64 + 1]byte
b[0] = byte(updateDeliveredOp)
n := 1
n += binary.PutUvarint(b[n:], 100)
n += binary.PutUvarint(b[n:], 100)
n += binary.PutUvarint(b[n:], 1)
n += binary.PutVarint(b[n:], time.Now().UnixNano())
return b[:n]
}

updateAcksBuffer := func() []byte {
var b [2*binary.MaxVarintLen64 + 1]byte
b[0] = byte(updateAcksOp)
n := 1
n += binary.PutUvarint(b[n:], 100)
n += binary.PutUvarint(b[n:], 100)
return b[:n]
}

// Store an uncommitted entry into our WAL, which will be committed and applied later.
co := lookupConsumer(cl)
rn := co.node.(*raft)
rn.Lock()
entries := []*Entry{{EntryNormal, updateDeliveredBuffer()}, {EntryNormal, updateAcksBuffer()}}
ae := encode(t, rn.buildAppendEntry(entries))
err = rn.storeToWAL(ae)
minPindex := rn.pindex
rn.Unlock()
require_NoError(t, err)

// Simulate leader change, we do this so we can check what happens in the upper layer logic.
rn.leadc <- true
rn.SetObserver(false)

// Since upper layer is async, we don't know whether it will or will not act on the leader change.
// Wait for some time to check if it does.
time.Sleep(2 * time.Second)
rn.RLock()
maxPindex := rn.pindex
rn.RUnlock()

r := c.randomNonConsumerLeader(globalAccountName, "TEST", "CONSUMER")
ro := lookupConsumer(r)
rn = ro.node.(*raft)

checkFor(t, 5*time.Second, time.Second, func() error {
rn.RLock()
defer rn.RUnlock()
if rn.pindex < maxPindex {
return fmt.Errorf("rn.pindex too low, expected %d, got %d", maxPindex, rn.pindex)
}
return nil
})

// We should only have 'Normal' entries.
// If we'd get a 'Snapshot' entry, that would mean it had incomplete state and would be reverting committed state.
var state StreamState
rn.wal.FastState(&state)
for seq := minPindex; seq <= maxPindex; seq++ {
ae, err = rn.loadEntry(seq)
// Restart all.
c.stopAll()
c.restartAll()
c.waitOnAllCurrent()
c.waitOnStreamLeader(globalAccountName, "TEST")
rs = c.serverByName(rs.Name())
// Check all servers ended up with all published messages, which had quorum.
for _, s := range c.servers {
c.waitOnStreamCurrent(s, globalAccountName, "TEST")
acc, err := s.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
for _, entry := range ae.entries {
require_Equal(t, entry.Type, EntryNormal)
state := mset.state()
require_Equal(t, state.Msgs, 3)
require_Equal(t, state.Bytes, 99)
}
// Check that the first two published messages came from our WAL, and
// the last came from a catchup by another leader.
for _, n := range rs.raftNodes {
rn := n.(*raft)
if rn.accName == globalAccountName {
ae, err := rn.loadEntry(2)
require_NoError(t, err)
require_True(t, ae.leader == rn.ID())
ae, err = rn.loadEntry(3)
require_NoError(t, err)
require_True(t, ae.leader == rn.ID())
ae, err = rn.loadEntry(4)
require_NoError(t, err)
require_True(t, ae.leader != rn.ID())
}
}
}
Expand Down
34 changes: 20 additions & 14 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -3082,10 +3082,10 @@ func (n *raft) catchupStalled() bool {
if n.catchup == nil {
return false
}
if n.catchup.pindex == n.pindex {
if n.catchup.pindex == n.commit {
return time.Since(n.catchup.active) > 2*time.Second
}
n.catchup.pindex = n.pindex
n.catchup.pindex = n.commit
n.catchup.active = time.Now()
return false
}
Expand All @@ -3104,7 +3104,7 @@ func (n *raft) createCatchup(ae *appendEntry) string {
cterm: ae.pterm,
cindex: ae.pindex,
pterm: n.pterm,
pindex: n.pindex,
pindex: n.commit,
active: time.Now(),
}
inbox := n.newCatchupInbox()
Expand Down Expand Up @@ -3274,7 +3274,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
if n.catchupStalled() {
n.debug("Catchup may be stalled, will request again")
inbox = n.createCatchup(ae)
ar = newAppendEntryResponse(n.pterm, n.pindex, n.id, false)
ar = newAppendEntryResponse(n.pterm, n.commit, n.id, false)
}
n.Unlock()
if ar != nil {
Expand Down Expand Up @@ -3315,28 +3315,34 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
}

if (isNew && ae.pterm != n.pterm) || ae.pindex != n.pindex {
// Check if this is a lower or equal index than what we were expecting.
if ae.pindex <= n.pindex {
// Check if this is a lower index than what we were expecting.
if ae.pindex < n.pindex {
n.debug("AppendEntry detected pindex less than ours: %d:%d vs %d:%d", ae.pterm, ae.pindex, n.pterm, n.pindex)
var ar *appendEntryResponse

// An AppendEntry is stored at seq=ae.pindex+1. This can be checked when eae != nil, eae.pindex==ae.pindex.
seq := ae.pindex + 1
var success bool
if eae, _ := n.loadEntry(ae.pindex); eae == nil {
if eae, _ := n.loadEntry(seq); eae == nil {
// If terms are equal, and we are not catching up, we have simply already processed this message.
// So we will ACK back to the leader. This can happen on server restarts based on timings of snapshots.
if ae.pterm == n.pterm && !catchingUp {
success = true
} else {
n.resetWAL()
}
} else {
// If terms mismatched, or we got an error loading, delete that entry and all others past it.
} else if eae.term != ae.term {
// If terms mismatched, delete that entry and all others past it.
// Make sure to cancel any catchups in progress.
// Truncate will reset our pterm and pindex. Only do so if we have an entry.
n.truncateWAL(eae.pterm, eae.pindex)
} else {
success = true
}
// Cancel regardless if truncated/unsuccessful.
if !success {
n.cancelCatchup()
}
// Cancel regardless.
n.cancelCatchup()

// Create response.
ar = newAppendEntryResponse(ae.pterm, ae.pindex, n.id, success)
Expand Down Expand Up @@ -3410,11 +3416,11 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
return

} else {
n.debug("AppendEntry did not match %d %d with %d %d", ae.pterm, ae.pindex, n.pterm, n.pindex)
if ae.pindex > n.pindex {
n.debug("AppendEntry did not match %d %d with %d %d (commit %d)", ae.pterm, ae.pindex, n.pterm, n.pindex, n.commit)
if ae.pindex > n.commit {
// Setup our state for catching up.
inbox := n.createCatchup(ae)
ar := newAppendEntryResponse(n.pterm, n.pindex, n.id, false)
ar := newAppendEntryResponse(n.pterm, n.commit, n.id, false)
n.Unlock()
n.sendRPC(ae.reply, inbox, ar.encode(arbuf))
arPool.Put(ar)
Expand Down
Loading

0 comments on commit 3025778

Please sign in to comment.