Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NRG (2.11): Start catchup from n.commit & fix AppendEntry is stored at seq=ae.pindex+1 #5987

Merged
merged 7 commits into from
Oct 15, 2024
Merged
162 changes: 162 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4237,3 +4237,165 @@ func TestJetStreamClusterHardKillAfterStreamAdd(t *testing.T) {
_, err = js.StreamInfo("TEST")
require_NoError(t, err)
}

func TestJetStreamClusterDesyncAfterPublishToLeaderWithoutQuorum(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

si, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

streamLeader := si.Cluster.Leader
streamLeaderServer := c.serverByName(streamLeader)
nc.Close()
nc, js = jsClientConnect(t, streamLeaderServer)
defer nc.Close()

servers := slices.DeleteFunc([]string{"S-1", "S-2", "S-3"}, func(s string) bool {
return s == streamLeader
})

// Stop followers so further publishes will not have quorum.
followerName1 := servers[0]
followerName2 := servers[1]
followerServer1 := c.serverByName(followerName1)
followerServer2 := c.serverByName(followerName2)
followerServer1.Shutdown()
followerServer2.Shutdown()
followerServer1.WaitForShutdown()
followerServer2.WaitForShutdown()

// Although this request will time out, it will be added to the stream leader's WAL.
_, err = js.Publish("foo", []byte("first"), nats.AckWait(time.Second))
require_NotNil(t, err)
require_Equal(t, err, nats.ErrTimeout)

// Now shut down the leader as well.
nc.Close()
streamLeaderServer.Shutdown()
streamLeaderServer.WaitForShutdown()

// Only restart the (previous) followers.
followerServer1 = c.restartServer(followerServer1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is one server variable reassigned and not the other?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used below to have a connection to that server:

nc, js = jsClientConnect(t, followerServer1)

The connection could be to either server, as long as it's not the (previous) leader. So only this one variable is used to setup the connection.

c.restartServer(followerServer2)
c.waitOnStreamLeader(globalAccountName, "TEST")

nc, js = jsClientConnect(t, followerServer1)
defer nc.Close()

// Publishing a message will now have quorum.
pubAck, err := js.Publish("foo", []byte("first, this is a retry"))
require_NoError(t, err)
require_Equal(t, pubAck.Sequence, 1)

// Bring up the previous stream leader.
c.restartServer(streamLeaderServer)
c.waitOnAllCurrent()
c.waitOnStreamLeader(globalAccountName, "TEST")

// Check all servers ended up with the last published message, which had quorum.
for _, s := range c.servers {
c.waitOnStreamCurrent(s, globalAccountName, "TEST")

acc, err := s.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
state := mset.state()
require_Equal(t, state.Msgs, 1)
require_Equal(t, state.Bytes, 55)
}
}

func TestJetStreamClusterPreserveWALDuringCatchupWithMatchingTerm(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.>"},
Replicas: 3,
})
nc.Close()
require_NoError(t, err)

// Pick one server that will only store a part of the messages in its WAL.
rs := c.randomNonStreamLeader(globalAccountName, "TEST")
ts := time.Now().UnixNano()

// Manually add 3 append entries to each node's WAL, except for one node who is one behind.
var scratch [1024]byte
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bit may use an explanation... maybe

Manually add 3 append entries to each node's WAL, except for one node who is one behind

I actually am not sure. Your inner loop goes to 3, but then you have a break at 1.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That description is correct. Two servers will have 3 uncommitted entries, and one server will have 2 uncommitted entries so it needs to catchup for that third one.

Have moved that condition for that one server up, so it's a bit clearer it gets 2 iterations of that loop.

for _, s := range c.servers {
for _, n := range s.raftNodes {
rn := n.(*raft)
if rn.accName == globalAccountName {
for i := uint64(0); i < 3; i++ {
// One server will be one behind and need to catchup.
if s.Name() == rs.Name() && i >= 2 {
break
}

esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, i, ts, true)
entries := []*Entry{newEntry(EntryNormal, esm)}
rn.Lock()
ae := rn.buildAppendEntry(entries)
ae.buf, err = ae.encode(scratch[:])
require_NoError(t, err)
err = rn.storeToWAL(ae)
rn.Unlock()
require_NoError(t, err)
}
}
}
}

// Restart all.
c.stopAll()
c.restartAll()
c.waitOnAllCurrent()
c.waitOnStreamLeader(globalAccountName, "TEST")

rs = c.serverByName(rs.Name())

// Check all servers ended up with all published messages, which had quorum.
for _, s := range c.servers {
c.waitOnStreamCurrent(s, globalAccountName, "TEST")

acc, err := s.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
state := mset.state()
require_Equal(t, state.Msgs, 3)
require_Equal(t, state.Bytes, 99)
}

// Check that the first two published messages came from our WAL, and
// the last came from a catchup by another leader.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me you are doing the same check for all 3 entries you look at, this comment is maybe outdated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 3 checks, 2x require_Equal and 1x require_NotEqual. Have changed it to use require_True with == and != instead, that seems a bit more clear.
(And it doesn't matter what the values being compared are, just that they either match or not)

for _, n := range rs.raftNodes {
rn := n.(*raft)
if rn.accName == globalAccountName {
ae, err := rn.loadEntry(2)
require_NoError(t, err)
require_True(t, ae.leader == rn.ID())

ae, err = rn.loadEntry(3)
require_NoError(t, err)
require_True(t, ae.leader == rn.ID())

ae, err = rn.loadEntry(4)
require_NoError(t, err)
require_True(t, ae.leader != rn.ID())
}
}
}
48 changes: 32 additions & 16 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,8 @@ func (s *Server) bootstrapRaftNode(cfg *RaftConfig, knownPeers []string, allPeer
return writePeerState(cfg.Store, &peerState{knownPeers, expected, extUndetermined})
}

// startRaftNode will start the raft node.
func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabels) (RaftNode, error) {
// initRaftNode will initialize the raft node, to be used by startRaftNode or when testing to not run the Go routine.
func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabels) (*raft, error) {
if cfg == nil {
return nil, errNilCfg
}
Expand Down Expand Up @@ -520,6 +520,16 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
labels["group"] = n.group
s.registerRaftNode(n.group, n)

return n, nil
}

// startRaftNode will start the raft node.
func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabels) (RaftNode, error) {
n, err := s.initRaftNode(accName, cfg, labels)
if err != nil {
return nil, err
}

// Start the run goroutine for the Raft state machine.
s.startGoRoutine(n.run, labels)

Expand Down Expand Up @@ -3141,10 +3151,10 @@ func (n *raft) catchupStalled() bool {
if n.catchup == nil {
return false
}
if n.catchup.pindex == n.pindex {
if n.catchup.pindex == n.commit {
return time.Since(n.catchup.active) > 2*time.Second
}
n.catchup.pindex = n.pindex
n.catchup.pindex = n.commit
n.catchup.active = time.Now()
return false
}
Expand All @@ -3163,7 +3173,7 @@ func (n *raft) createCatchup(ae *appendEntry) string {
cterm: ae.pterm,
cindex: ae.pindex,
pterm: n.pterm,
pindex: n.pindex,
pindex: n.commit,
active: time.Now(),
}
inbox := n.newCatchupInbox()
Expand Down Expand Up @@ -3333,7 +3343,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
if n.catchupStalled() {
n.debug("Catchup may be stalled, will request again")
inbox = n.createCatchup(ae)
ar = newAppendEntryResponse(n.pterm, n.pindex, n.id, false)
ar = newAppendEntryResponse(n.pterm, n.commit, n.id, false)
}
n.Unlock()
if ar != nil {
Expand Down Expand Up @@ -3374,28 +3384,34 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
}

if (isNew && ae.pterm != n.pterm) || ae.pindex != n.pindex {
// Check if this is a lower or equal index than what we were expecting.
if ae.pindex <= n.pindex {
// Check if this is a lower index than what we were expecting.
if ae.pindex < n.pindex {
n.debug("AppendEntry detected pindex less than ours: %d:%d vs %d:%d", ae.pterm, ae.pindex, n.pterm, n.pindex)
var ar *appendEntryResponse

// An AppendEntry is stored at seq=ae.pindex+1. This can be checked when eae != nil, eae.pindex==ae.pindex.
seq := ae.pindex + 1
var success bool
if eae, _ := n.loadEntry(ae.pindex); eae == nil {
if eae, _ := n.loadEntry(seq); eae == nil {
// If terms are equal, and we are not catching up, we have simply already processed this message.
// So we will ACK back to the leader. This can happen on server restarts based on timings of snapshots.
if ae.pterm == n.pterm && !catchingUp {
success = true
} else {
n.resetWAL()
}
} else {
// If terms mismatched, or we got an error loading, delete that entry and all others past it.
} else if eae.term != ae.term {
// If terms mismatched, delete that entry and all others past it.
// Make sure to cancel any catchups in progress.
// Truncate will reset our pterm and pindex. Only do so if we have an entry.
n.truncateWAL(eae.pterm, eae.pindex)
} else {
success = true
}
// Cancel regardless if truncated/unsuccessful.
if !success {
n.cancelCatchup()
}
// Cancel regardless.
n.cancelCatchup()

// Create response.
ar = newAppendEntryResponse(ae.pterm, ae.pindex, n.id, success)
Expand Down Expand Up @@ -3469,11 +3485,11 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
return

} else {
n.debug("AppendEntry did not match %d %d with %d %d", ae.pterm, ae.pindex, n.pterm, n.pindex)
if ae.pindex > n.pindex {
n.debug("AppendEntry did not match %d %d with %d %d (commit %d)", ae.pterm, ae.pindex, n.pterm, n.pindex, n.commit)
if ae.pindex > n.commit {
// Setup our state for catching up.
inbox := n.createCatchup(ae)
ar := newAppendEntryResponse(n.pterm, n.pindex, n.id, false)
ar := newAppendEntryResponse(n.pterm, n.commit, n.id, false)
n.Unlock()
n.sendRPC(ae.reply, inbox, ar.encode(arbuf))
arPool.Put(ar)
Expand Down
Loading