diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 8761f786b30..9b3a0bd0168 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -21,8 +21,6 @@ import ( "encoding/json" "errors" "fmt" - "io" - "io/fs" "math/rand" "os" "path" @@ -2798,85 +2796,6 @@ func TestJetStreamClusterKeyValueDesyncAfterHardKill(t *testing.T) { c.waitOnClusterReady() c.waitOnAllCurrent() - getStreamDetails := func(t *testing.T, c *cluster, accountName, streamName string) *StreamDetail { - t.Helper() - srv := c.streamLeader(accountName, streamName) - if srv == nil { - return nil - } - jsz, err := srv.Jsz(&JSzOptions{Accounts: true, Streams: true, Consumer: true}) - require_NoError(t, err) - for _, acc := range jsz.AccountDetails { - if acc.Name == accountName { - for _, stream := range acc.Streams { - if stream.Name == streamName { - return &stream - } - } - } - } - t.Error("Could not find account details") - return nil - } - - checkState := func(t *testing.T, c *cluster, accountName, streamName string) error { - t.Helper() - - leaderSrv := c.streamLeader(accountName, streamName) - if leaderSrv == nil { - return fmt.Errorf("no leader server found for stream %q", streamName) - } - streamLeader := getStreamDetails(t, c, accountName, streamName) - if streamLeader == nil { - return fmt.Errorf("no leader found for stream %q", streamName) - } - var errs []error - for _, srv := range c.servers { - if srv == leaderSrv { - // Skip self - continue - } - acc, err := srv.LookupAccount(accountName) - require_NoError(t, err) - stream, err := acc.lookupStream(streamName) - require_NoError(t, err) - state := stream.state() - - if state.Msgs != streamLeader.State.Msgs { - err := fmt.Errorf("[%s] Leader %v has %d messages, Follower %v has %d messages", - streamName, leaderSrv, streamLeader.State.Msgs, - srv, state.Msgs, - ) - errs = append(errs, err) - } - if state.FirstSeq != streamLeader.State.FirstSeq { - err := fmt.Errorf("[%s] Leader %v FirstSeq is %d, Follower %v is at %d", - streamName, leaderSrv, streamLeader.State.FirstSeq, - srv, state.FirstSeq, - ) - errs = append(errs, err) - } - if state.LastSeq != streamLeader.State.LastSeq { - err := fmt.Errorf("[%s] Leader %v LastSeq is %d, Follower %v is at %d", - streamName, leaderSrv, streamLeader.State.LastSeq, - srv, state.LastSeq, - ) - errs = append(errs, err) - } - if state.NumDeleted != streamLeader.State.NumDeleted { - err := fmt.Errorf("[%s] Leader %v NumDeleted is %d, Follower %v is at %d", - streamName, leaderSrv, streamLeader.State.NumDeleted, - srv, state.NumDeleted, - ) - errs = append(errs, err) - } - } - if len(errs) > 0 { - return errors.Join(errs...) - } - return nil - } - err = checkState(t, c, "$G", "KV_inconsistency") require_NoError(t, err) } @@ -2935,84 +2854,6 @@ func TestJetStreamClusterKeyValueSync(t *testing.T) { var counter int64 var errorCounter int64 - getStreamDetails := func(t *testing.T, c *cluster, accountName, streamName string) *StreamDetail { - t.Helper() - srv := c.streamLeader(accountName, streamName) - if srv == nil { - return nil - } - jsz, err := srv.Jsz(&JSzOptions{Accounts: true, Streams: true, Consumer: true}) - require_NoError(t, err) - for _, acc := range jsz.AccountDetails { - if acc.Name == accountName { - for _, stream := range acc.Streams { - if stream.Name == streamName { - return &stream - } - } - } - } - t.Error("Could not find account details") - return nil - } - checkState := func(t *testing.T, c *cluster, accountName, streamName string) error { - t.Helper() - - leaderSrv := c.streamLeader(accountName, streamName) - if leaderSrv == nil { - return fmt.Errorf("no leader server found for stream %q", streamName) - } - streamLeader := getStreamDetails(t, c, accountName, streamName) - if streamLeader == nil { - return fmt.Errorf("no leader found for stream %q", streamName) - } - var errs []error - for _, srv := range c.servers { - if srv == leaderSrv { - // Skip self - continue - } - acc, err := srv.LookupAccount(accountName) - require_NoError(t, err) - stream, err := acc.lookupStream(streamName) - require_NoError(t, err) - state := stream.state() - - if state.Msgs != streamLeader.State.Msgs { - err := fmt.Errorf("[%s] Leader %v has %d messages, Follower %v has %d messages", - streamName, leaderSrv, streamLeader.State.Msgs, - srv, state.Msgs, - ) - errs = append(errs, err) - } - if state.FirstSeq != streamLeader.State.FirstSeq { - err := fmt.Errorf("[%s] Leader %v FirstSeq is %d, Follower %v is at %d", - streamName, leaderSrv, streamLeader.State.FirstSeq, - srv, state.FirstSeq, - ) - errs = append(errs, err) - } - if state.LastSeq != streamLeader.State.LastSeq { - err := fmt.Errorf("[%s] Leader %v LastSeq is %d, Follower %v is at %d", - streamName, leaderSrv, streamLeader.State.LastSeq, - srv, state.LastSeq, - ) - errs = append(errs, err) - } - if state.NumDeleted != streamLeader.State.NumDeleted { - err := fmt.Errorf("[%s] Leader %v NumDeleted is %d, Follower %v is at %d\nSTATE_A: %+v\nSTATE_B: %+v\n", - streamName, leaderSrv, streamLeader.State.NumDeleted, - srv, state.NumDeleted, streamLeader.State, state, - ) - errs = append(errs, err) - } - } - if len(errs) > 0 { - return errors.Join(errs...) - } - return nil - } - checkMsgsEqual := func(t *testing.T, accountName, streamName string) error { // Gather all the streams replicas and compare contents. msets := make(map[*Server]*stream) @@ -4218,39 +4059,13 @@ func TestJetStreamClusterHardKillAfterStreamAdd(t *testing.T) { }) require_NoError(t, err) - copyDir := func(dst, src string) error { - srcFS := os.DirFS(src) - return fs.WalkDir(srcFS, ".", func(p string, d os.DirEntry, err error) error { - if err != nil { - return err - } - newPath := path.Join(dst, p) - if d.IsDir() { - return os.MkdirAll(newPath, defaultDirPerms) - } - r, err := srcFS.Open(p) - if err != nil { - return err - } - defer r.Close() - - w, err := os.OpenFile(newPath, os.O_CREATE|os.O_WRONLY, defaultFilePerms) - if err != nil { - return err - } - defer w.Close() - _, err = io.Copy(w, r) - return err - }) - } - // Simulate being hard killed by: // 1. copy directories before shutdown copyToSrcMap := make(map[string]string) for _, s := range c.servers { sd := s.StoreDir() copySd := path.Join(t.TempDir(), JetStreamStoreDir) - err = copyDir(copySd, sd) + err = copyDir(t, copySd, sd) require_NoError(t, err) copyToSrcMap[copySd] = sd } @@ -4263,7 +4078,7 @@ func TestJetStreamClusterHardKillAfterStreamAdd(t *testing.T) { for cp, dest := range copyToSrcMap { err = os.RemoveAll(dest) require_NoError(t, err) - err = copyDir(dest, cp) + err = copyDir(t, dest, cp) require_NoError(t, err) } @@ -4452,3 +4267,78 @@ func TestJetStreamClusterPreserveWALDuringCatchupWithMatchingTerm(t *testing.T) } } } + +func TestJetStreamClusterDesyncAfterRestartReplacesLeaderSnapshot(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + // Reconnect to the leader. + leader := c.streamLeader(globalAccountName, "TEST") + nc.Close() + nc, js = jsClientConnect(t, leader) + defer nc.Close() + + lookupStream := func(s *Server) *stream { + t.Helper() + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + return mset + } + + // Stop one follower so it lags behind. + rs := c.randomNonStreamLeader(globalAccountName, "TEST") + mset := lookupStream(rs) + n := mset.node.(*raft) + followerSnapshots := path.Join(n.sd, snapshotsDir) + rs.Shutdown() + rs.WaitForShutdown() + + // Move the stream forward so the follower requires a snapshot. + err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 10}) + require_NoError(t, err) + _, err = js.Publish("foo", nil) + require_NoError(t, err) + + // Install a snapshot on the leader, ensuring RAFT entries are compacted and a snapshot remains. + mset = lookupStream(leader) + n = mset.node.(*raft) + err = n.InstallSnapshot(mset.stateSnapshot()) + require_NoError(t, err) + + c.stopAll() + + // Replace follower snapshot with the leader's. + // This simulates the follower coming online, getting a snapshot from the leader after which it goes offline. + leaderSnapshots := path.Join(n.sd, snapshotsDir) + err = os.RemoveAll(followerSnapshots) + require_NoError(t, err) + err = copyDir(t, followerSnapshots, leaderSnapshots) + require_NoError(t, err) + + // Start the follower, it will load the snapshot from the leader. + rs = c.restartServer(rs) + + // Shutting down must check that the leader's snapshot is not overwritten. + rs.Shutdown() + rs.WaitForShutdown() + + // Now start all servers back up. + c.restartAll() + c.waitOnAllCurrent() + + checkFor(t, 10*time.Second, 500*time.Millisecond, func() error { + return checkState(t, c, globalAccountName, "TEST") + }) +} diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index 690a76923a7..1512012a0bf 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -19,11 +19,15 @@ package server import ( "context" "encoding/json" + "errors" "fmt" + "io" + "io/fs" "math/rand" "net" "net/url" "os" + "path" "strings" "sync" "testing" @@ -1913,3 +1917,109 @@ func (b *bitset) String() string { sb.WriteString("\n") return sb.String() } + +func copyDir(t *testing.T, dst, src string) error { + t.Helper() + srcFS := os.DirFS(src) + return fs.WalkDir(srcFS, ".", func(p string, d os.DirEntry, err error) error { + if err != nil { + return err + } + newPath := path.Join(dst, p) + if d.IsDir() { + return os.MkdirAll(newPath, defaultDirPerms) + } + r, err := srcFS.Open(p) + if err != nil { + return err + } + defer r.Close() + + w, err := os.OpenFile(newPath, os.O_CREATE|os.O_WRONLY, defaultFilePerms) + if err != nil { + return err + } + defer w.Close() + _, err = io.Copy(w, r) + return err + }) +} + +func getStreamDetails(t *testing.T, c *cluster, accountName, streamName string) *StreamDetail { + t.Helper() + srv := c.streamLeader(accountName, streamName) + if srv == nil { + return nil + } + jsz, err := srv.Jsz(&JSzOptions{Accounts: true, Streams: true, Consumer: true}) + require_NoError(t, err) + for _, acc := range jsz.AccountDetails { + if acc.Name == accountName { + for _, stream := range acc.Streams { + if stream.Name == streamName { + return &stream + } + } + } + } + t.Error("Could not find account details") + return nil +} + +func checkState(t *testing.T, c *cluster, accountName, streamName string) error { + t.Helper() + + leaderSrv := c.streamLeader(accountName, streamName) + if leaderSrv == nil { + return fmt.Errorf("no leader server found for stream %q", streamName) + } + streamLeader := getStreamDetails(t, c, accountName, streamName) + if streamLeader == nil { + return fmt.Errorf("no leader found for stream %q", streamName) + } + var errs []error + for _, srv := range c.servers { + if srv == leaderSrv { + // Skip self + continue + } + acc, err := srv.LookupAccount(accountName) + require_NoError(t, err) + stream, err := acc.lookupStream(streamName) + require_NoError(t, err) + state := stream.state() + + if state.Msgs != streamLeader.State.Msgs { + err := fmt.Errorf("[%s] Leader %v has %d messages, Follower %v has %d messages", + streamName, leaderSrv, streamLeader.State.Msgs, + srv, state.Msgs, + ) + errs = append(errs, err) + } + if state.FirstSeq != streamLeader.State.FirstSeq { + err := fmt.Errorf("[%s] Leader %v FirstSeq is %d, Follower %v is at %d", + streamName, leaderSrv, streamLeader.State.FirstSeq, + srv, state.FirstSeq, + ) + errs = append(errs, err) + } + if state.LastSeq != streamLeader.State.LastSeq { + err := fmt.Errorf("[%s] Leader %v LastSeq is %d, Follower %v is at %d", + streamName, leaderSrv, streamLeader.State.LastSeq, + srv, state.LastSeq, + ) + errs = append(errs, err) + } + if state.NumDeleted != streamLeader.State.NumDeleted { + err := fmt.Errorf("[%s] Leader %v NumDeleted is %d, Follower %v is at %d\nSTATE_A: %+v\nSTATE_B: %+v\n", + streamName, leaderSrv, streamLeader.State.NumDeleted, + srv, state.NumDeleted, streamLeader.State, state, + ) + errs = append(errs, err) + } + } + if len(errs) > 0 { + return errors.Join(errs...) + } + return nil +} diff --git a/server/raft.go b/server/raft.go index 9fbb76fefb8..de57e565b43 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1134,7 +1134,7 @@ func (n *raft) InstallSnapshot(data []byte) error { // Check that a catchup isn't already taking place. If it is then we won't // allow installing snapshots until it is done. - if len(n.progress) > 0 { + if len(n.progress) > 0 || n.paused { return errCatchupsRunning }