Skip to content

Commit

Permalink
[FIXED] Don't replace leader's snapshot during shutdown (#6053)
Browse files Browse the repository at this point in the history
When a follower received a snapshot from the leader and then restarts
(before it can complete the catchup), it overwrites the leader's
snapshot with its own. Resulting in a desynced stream, since it doesn't
retry catchup.

Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
derekcollison authored Oct 29, 2024
2 parents 06e5aa2 + 1e218f0 commit 4428dd9
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 188 deletions.
264 changes: 77 additions & 187 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
"math/rand"
"os"
"path"
Expand Down Expand Up @@ -2798,85 +2796,6 @@ func TestJetStreamClusterKeyValueDesyncAfterHardKill(t *testing.T) {
c.waitOnClusterReady()
c.waitOnAllCurrent()

getStreamDetails := func(t *testing.T, c *cluster, accountName, streamName string) *StreamDetail {
t.Helper()
srv := c.streamLeader(accountName, streamName)
if srv == nil {
return nil
}
jsz, err := srv.Jsz(&JSzOptions{Accounts: true, Streams: true, Consumer: true})
require_NoError(t, err)
for _, acc := range jsz.AccountDetails {
if acc.Name == accountName {
for _, stream := range acc.Streams {
if stream.Name == streamName {
return &stream
}
}
}
}
t.Error("Could not find account details")
return nil
}

checkState := func(t *testing.T, c *cluster, accountName, streamName string) error {
t.Helper()

leaderSrv := c.streamLeader(accountName, streamName)
if leaderSrv == nil {
return fmt.Errorf("no leader server found for stream %q", streamName)
}
streamLeader := getStreamDetails(t, c, accountName, streamName)
if streamLeader == nil {
return fmt.Errorf("no leader found for stream %q", streamName)
}
var errs []error
for _, srv := range c.servers {
if srv == leaderSrv {
// Skip self
continue
}
acc, err := srv.LookupAccount(accountName)
require_NoError(t, err)
stream, err := acc.lookupStream(streamName)
require_NoError(t, err)
state := stream.state()

if state.Msgs != streamLeader.State.Msgs {
err := fmt.Errorf("[%s] Leader %v has %d messages, Follower %v has %d messages",
streamName, leaderSrv, streamLeader.State.Msgs,
srv, state.Msgs,
)
errs = append(errs, err)
}
if state.FirstSeq != streamLeader.State.FirstSeq {
err := fmt.Errorf("[%s] Leader %v FirstSeq is %d, Follower %v is at %d",
streamName, leaderSrv, streamLeader.State.FirstSeq,
srv, state.FirstSeq,
)
errs = append(errs, err)
}
if state.LastSeq != streamLeader.State.LastSeq {
err := fmt.Errorf("[%s] Leader %v LastSeq is %d, Follower %v is at %d",
streamName, leaderSrv, streamLeader.State.LastSeq,
srv, state.LastSeq,
)
errs = append(errs, err)
}
if state.NumDeleted != streamLeader.State.NumDeleted {
err := fmt.Errorf("[%s] Leader %v NumDeleted is %d, Follower %v is at %d",
streamName, leaderSrv, streamLeader.State.NumDeleted,
srv, state.NumDeleted,
)
errs = append(errs, err)
}
}
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}

err = checkState(t, c, "$G", "KV_inconsistency")
require_NoError(t, err)
}
Expand Down Expand Up @@ -2935,84 +2854,6 @@ func TestJetStreamClusterKeyValueSync(t *testing.T) {
var counter int64
var errorCounter int64

getStreamDetails := func(t *testing.T, c *cluster, accountName, streamName string) *StreamDetail {
t.Helper()
srv := c.streamLeader(accountName, streamName)
if srv == nil {
return nil
}
jsz, err := srv.Jsz(&JSzOptions{Accounts: true, Streams: true, Consumer: true})
require_NoError(t, err)
for _, acc := range jsz.AccountDetails {
if acc.Name == accountName {
for _, stream := range acc.Streams {
if stream.Name == streamName {
return &stream
}
}
}
}
t.Error("Could not find account details")
return nil
}
checkState := func(t *testing.T, c *cluster, accountName, streamName string) error {
t.Helper()

leaderSrv := c.streamLeader(accountName, streamName)
if leaderSrv == nil {
return fmt.Errorf("no leader server found for stream %q", streamName)
}
streamLeader := getStreamDetails(t, c, accountName, streamName)
if streamLeader == nil {
return fmt.Errorf("no leader found for stream %q", streamName)
}
var errs []error
for _, srv := range c.servers {
if srv == leaderSrv {
// Skip self
continue
}
acc, err := srv.LookupAccount(accountName)
require_NoError(t, err)
stream, err := acc.lookupStream(streamName)
require_NoError(t, err)
state := stream.state()

if state.Msgs != streamLeader.State.Msgs {
err := fmt.Errorf("[%s] Leader %v has %d messages, Follower %v has %d messages",
streamName, leaderSrv, streamLeader.State.Msgs,
srv, state.Msgs,
)
errs = append(errs, err)
}
if state.FirstSeq != streamLeader.State.FirstSeq {
err := fmt.Errorf("[%s] Leader %v FirstSeq is %d, Follower %v is at %d",
streamName, leaderSrv, streamLeader.State.FirstSeq,
srv, state.FirstSeq,
)
errs = append(errs, err)
}
if state.LastSeq != streamLeader.State.LastSeq {
err := fmt.Errorf("[%s] Leader %v LastSeq is %d, Follower %v is at %d",
streamName, leaderSrv, streamLeader.State.LastSeq,
srv, state.LastSeq,
)
errs = append(errs, err)
}
if state.NumDeleted != streamLeader.State.NumDeleted {
err := fmt.Errorf("[%s] Leader %v NumDeleted is %d, Follower %v is at %d\nSTATE_A: %+v\nSTATE_B: %+v\n",
streamName, leaderSrv, streamLeader.State.NumDeleted,
srv, state.NumDeleted, streamLeader.State, state,
)
errs = append(errs, err)
}
}
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}

checkMsgsEqual := func(t *testing.T, accountName, streamName string) error {
// Gather all the streams replicas and compare contents.
msets := make(map[*Server]*stream)
Expand Down Expand Up @@ -4218,39 +4059,13 @@ func TestJetStreamClusterHardKillAfterStreamAdd(t *testing.T) {
})
require_NoError(t, err)

copyDir := func(dst, src string) error {
srcFS := os.DirFS(src)
return fs.WalkDir(srcFS, ".", func(p string, d os.DirEntry, err error) error {
if err != nil {
return err
}
newPath := path.Join(dst, p)
if d.IsDir() {
return os.MkdirAll(newPath, defaultDirPerms)
}
r, err := srcFS.Open(p)
if err != nil {
return err
}
defer r.Close()

w, err := os.OpenFile(newPath, os.O_CREATE|os.O_WRONLY, defaultFilePerms)
if err != nil {
return err
}
defer w.Close()
_, err = io.Copy(w, r)
return err
})
}

// Simulate being hard killed by:
// 1. copy directories before shutdown
copyToSrcMap := make(map[string]string)
for _, s := range c.servers {
sd := s.StoreDir()
copySd := path.Join(t.TempDir(), JetStreamStoreDir)
err = copyDir(copySd, sd)
err = copyDir(t, copySd, sd)
require_NoError(t, err)
copyToSrcMap[copySd] = sd
}
Expand All @@ -4263,7 +4078,7 @@ func TestJetStreamClusterHardKillAfterStreamAdd(t *testing.T) {
for cp, dest := range copyToSrcMap {
err = os.RemoveAll(dest)
require_NoError(t, err)
err = copyDir(dest, cp)
err = copyDir(t, dest, cp)
require_NoError(t, err)
}

Expand Down Expand Up @@ -4452,3 +4267,78 @@ func TestJetStreamClusterPreserveWALDuringCatchupWithMatchingTerm(t *testing.T)
}
}
}

func TestJetStreamClusterDesyncAfterRestartReplacesLeaderSnapshot(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

// Reconnect to the leader.
leader := c.streamLeader(globalAccountName, "TEST")
nc.Close()
nc, js = jsClientConnect(t, leader)
defer nc.Close()

lookupStream := func(s *Server) *stream {
t.Helper()
acc, err := s.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
return mset
}

// Stop one follower so it lags behind.
rs := c.randomNonStreamLeader(globalAccountName, "TEST")
mset := lookupStream(rs)
n := mset.node.(*raft)
followerSnapshots := path.Join(n.sd, snapshotsDir)
rs.Shutdown()
rs.WaitForShutdown()

// Move the stream forward so the follower requires a snapshot.
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 10})
require_NoError(t, err)
_, err = js.Publish("foo", nil)
require_NoError(t, err)

// Install a snapshot on the leader, ensuring RAFT entries are compacted and a snapshot remains.
mset = lookupStream(leader)
n = mset.node.(*raft)
err = n.InstallSnapshot(mset.stateSnapshot())
require_NoError(t, err)

c.stopAll()

// Replace follower snapshot with the leader's.
// This simulates the follower coming online, getting a snapshot from the leader after which it goes offline.
leaderSnapshots := path.Join(n.sd, snapshotsDir)
err = os.RemoveAll(followerSnapshots)
require_NoError(t, err)
err = copyDir(t, followerSnapshots, leaderSnapshots)
require_NoError(t, err)

// Start the follower, it will load the snapshot from the leader.
rs = c.restartServer(rs)

// Shutting down must check that the leader's snapshot is not overwritten.
rs.Shutdown()
rs.WaitForShutdown()

// Now start all servers back up.
c.restartAll()
c.waitOnAllCurrent()

checkFor(t, 10*time.Second, 500*time.Millisecond, func() error {
return checkState(t, c, globalAccountName, "TEST")
})
}
Loading

0 comments on commit 4428dd9

Please sign in to comment.